diff --git a/docs/reference_subject.rst b/docs/reference_subject.rst index 8c47793e0..090939ee4 100644 --- a/docs/reference_subject.rst +++ b/docs/reference_subject.rst @@ -3,14 +3,14 @@ Subject ======== -.. autoclass:: rx.subjects.Subject +.. autoclass:: rx.subject.Subject :members: -.. autoclass:: rx.subjects.BehaviorSubject +.. autoclass:: rx.subject.BehaviorSubject :members: -.. autoclass:: rx.subjects.ReplaySubject +.. autoclass:: rx.subject.ReplaySubject :members: -.. autoclass:: rx.subjects.AsyncSubject +.. autoclass:: rx.subject.AsyncSubject :members: diff --git a/examples/autocomplete/autocomplete.py b/examples/autocomplete/autocomplete.py index 8f15758dc..4b50c2a94 100644 --- a/examples/autocomplete/autocomplete.py +++ b/examples/autocomplete/autocomplete.py @@ -16,7 +16,7 @@ from tornado import ioloop from rx import operators as ops -from rx.subjects import Subject +from rx.subject import Subject from rx.scheduler.eventloop import IOLoopScheduler scheduler = IOLoopScheduler(ioloop.IOLoop.current()) diff --git a/examples/autocomplete/autocomplete_asyncio.py b/examples/autocomplete/autocomplete_asyncio.py index 086c3aded..0f48c320b 100644 --- a/examples/autocomplete/autocomplete_asyncio.py +++ b/examples/autocomplete/autocomplete_asyncio.py @@ -19,7 +19,7 @@ from rx import operators as ops from rx.scheduler.eventloop import AsyncIOScheduler -from rx.subjects import Subject +from rx.subject import Subject def search_wikipedia(term): diff --git a/examples/autocomplete/bottle_autocomplete.py b/examples/autocomplete/bottle_autocomplete.py index 0aedba885..ca9e7a0c4 100644 --- a/examples/autocomplete/bottle_autocomplete.py +++ b/examples/autocomplete/bottle_autocomplete.py @@ -9,6 +9,7 @@ from geventwebsocket.handler import WebSocketHandler import json, requests import rx +from rx.subject import Subject from rx.scheduler.eventloop import GEventScheduler class WikiFinder: @@ -38,7 +39,7 @@ def handle_websocket(): wsock = request.environ.get('wsgi.websocket') if not wsock: abort(400, 'Expected WebSocket request.') - stream = rx.subjects.Subject() + stream = Subject() query = stream.map( lambda x: x["term"] ).filter( diff --git a/examples/chess/chess.py b/examples/chess/chess.py index 0e6bebf83..0800569f7 100644 --- a/examples/chess/chess.py +++ b/examples/chess/chess.py @@ -4,7 +4,7 @@ import pygame from rx import operators as ops -from rx.subjects import Subject +from rx.subject import Subject from rx.scheduler.mainloop import PyGameScheduler diff --git a/examples/konamicode/konamicode.py b/examples/konamicode/konamicode.py index 8f23d770f..a19da403d 100644 --- a/examples/konamicode/konamicode.py +++ b/examples/konamicode/konamicode.py @@ -6,11 +6,12 @@ from tornado import ioloop from rx import operators as ops -from rx.subjects import Subject +from rx.subject import Subject UP, DOWN, LEFT, RIGHT, B, A = 38, 40, 37, 39, 66, 65 codes = [UP, UP, DOWN, DOWN, LEFT, RIGHT, LEFT, RIGHT, B, A] + class WSHandler(WebSocketHandler): def open(self): print("WebSocket opened") @@ -41,10 +42,12 @@ def on_message(self, message): def on_close(self): print("WebSocket closed") + class MainHandler(RequestHandler): def get(self): self.render("index.html") + def main(): port = os.environ.get("PORT", 8080) app = Application([ @@ -56,5 +59,6 @@ def main(): app.listen(port) ioloop.IOLoop.current().start() + if __name__ == '__main__': main() diff --git a/examples/timeflies/timeflies_gtk.py b/examples/timeflies/timeflies_gtk.py index d5eedc203..f7e0ad8df 100644 --- a/examples/timeflies/timeflies_gtk.py +++ b/examples/timeflies/timeflies_gtk.py @@ -1,6 +1,6 @@ import rx from rx import operators as ops -from rx.subjects import Subject +from rx.subject import Subject from rx.scheduler.mainloop import GtkScheduler import gi diff --git a/examples/timeflies/timeflies_qt.py b/examples/timeflies/timeflies_qt.py index 2f4542a65..6a38d0f23 100644 --- a/examples/timeflies/timeflies_qt.py +++ b/examples/timeflies/timeflies_qt.py @@ -2,7 +2,7 @@ import rx from rx import operators as ops -from rx.subjects import Subject +from rx.subject import Subject from rx.scheduler.mainloop import QtScheduler try: diff --git a/examples/timeflies/timeflies_tkinter.py b/examples/timeflies/timeflies_tkinter.py index 709e396a3..521c15bf1 100644 --- a/examples/timeflies/timeflies_tkinter.py +++ b/examples/timeflies/timeflies_tkinter.py @@ -2,7 +2,7 @@ import rx from rx import operators as ops -from rx.subjects import Subject +from rx.subject import Subject from rx.scheduler.mainloop import TkinterScheduler diff --git a/examples/timeflies/timeflies_wx.py b/examples/timeflies/timeflies_wx.py index 2a2a7d6c1..ec63d104f 100644 --- a/examples/timeflies/timeflies_wx.py +++ b/examples/timeflies/timeflies_wx.py @@ -1,6 +1,6 @@ import rx from rx import operators as ops -from rx.subjects import Subject +from rx.subject import Subject from rx.scheduler.mainloop import WxScheduler import wx diff --git a/notebooks/Getting Started.ipynb b/notebooks/Getting Started.ipynb index 52d763277..47015edbb 100644 --- a/notebooks/Getting Started.ipynb +++ b/notebooks/Getting Started.ipynb @@ -433,7 +433,7 @@ } ], "source": [ - "from rx.subjects import Subject\n", + "from rx.subject import Subject\n", "\n", "stream = Subject()\n", "stream.on_next(41)\n", diff --git a/notebooks/reactivex.io/startup.py b/notebooks/reactivex.io/startup.py index 3fc1580be..808a8c6fe 100644 --- a/notebooks/reactivex.io/startup.py +++ b/notebooks/reactivex.io/startup.py @@ -16,6 +16,8 @@ O = rx.Observable ts_glob = 0 # global start time + + def reset_start_time(show_doc_for=None, title=True, sleep=None): 'resets global start time and also prints doc strings' global ts_glob @@ -108,7 +110,7 @@ def _cur(): return _cur() from rx.scheduler import new_thread_scheduler, timeout_scheduler -from rx.subjects import Subject +from rx.subject import Subject from rx.testing import marbles, dump def marble_stream(s): return O.from_marbles(s).to_blocking() diff --git a/rx/core/abc/subject.py b/rx/core/abc/subject.py index f6c8e4f4e..1986e88d8 100644 --- a/rx/core/abc/subject.py +++ b/rx/core/abc/subject.py @@ -21,5 +21,5 @@ def on_completed(self): raise NotImplementedError @abstractmethod - def subscribe(self, observer=None, scheduler=None): + def subscribe(self, observer=None, *, scheduler=None): raise NotImplementedError diff --git a/rx/core/observable/toasync.py b/rx/core/observable/toasync.py index c4d43160e..39e24f8c7 100644 --- a/rx/core/observable/toasync.py +++ b/rx/core/observable/toasync.py @@ -4,7 +4,7 @@ from rx.core import Observable from rx.core.typing import Scheduler from rx.scheduler import timeout_scheduler -from rx.subjects import AsyncSubject +from rx.subject import AsyncSubject def _to_async(func: Callable, diff --git a/rx/core/operators/groupbyuntil.py b/rx/core/operators/groupbyuntil.py index 137bdf00b..2ef7c77a2 100644 --- a/rx/core/operators/groupbyuntil.py +++ b/rx/core/operators/groupbyuntil.py @@ -4,7 +4,7 @@ from rx import operators as ops from rx.core import Observable, GroupedObservable from rx.core.typing import Mapper -from rx.subjects import Subject +from rx.subject import Subject from rx.disposable import CompositeDisposable, RefCountDisposable, SingleAssignmentDisposable from rx.internal.basic import identity diff --git a/rx/core/operators/groupjoin.py b/rx/core/operators/groupjoin.py index 0916617cd..80a90ece1 100644 --- a/rx/core/operators/groupjoin.py +++ b/rx/core/operators/groupjoin.py @@ -6,7 +6,7 @@ from rx.core import Observable from rx.internal.utils import add_ref from rx.disposable import SingleAssignmentDisposable, RefCountDisposable, CompositeDisposable -from rx.subjects import Subject +from rx.subject import Subject log = logging.getLogger("Rx") diff --git a/rx/core/operators/publish.py b/rx/core/operators/publish.py index c36dede41..293f39351 100644 --- a/rx/core/operators/publish.py +++ b/rx/core/operators/publish.py @@ -3,7 +3,7 @@ from rx import operators as ops from rx.core import Observable, ConnectableObservable, pipe from rx.core.typing import Mapper -from rx.subjects import Subject +from rx.subject import Subject def _publish(mapper: Optional[Mapper] = None) -> Callable[[Observable], ConnectableObservable]: diff --git a/rx/core/operators/publishvalue.py b/rx/core/operators/publishvalue.py index 36eedc662..774d60d3f 100644 --- a/rx/core/operators/publishvalue.py +++ b/rx/core/operators/publishvalue.py @@ -2,7 +2,7 @@ from rx import operators as ops from rx.core import Observable -from rx.subjects import BehaviorSubject +from rx.subject import BehaviorSubject from rx.core.typing import Mapper diff --git a/rx/core/operators/replay.py b/rx/core/operators/replay.py index c7acbf65e..62c94455d 100644 --- a/rx/core/operators/replay.py +++ b/rx/core/operators/replay.py @@ -3,7 +3,7 @@ from rx import operators as ops from rx.core import Observable, ConnectableObservable, typing from rx.core.typing import Scheduler, Mapper -from rx.subjects import ReplaySubject +from rx.subject import ReplaySubject def _replay(mapper: Optional[Mapper] = None, diff --git a/rx/core/operators/window.py b/rx/core/operators/window.py index 3b998721e..d7b610435 100644 --- a/rx/core/operators/window.py +++ b/rx/core/operators/window.py @@ -6,7 +6,7 @@ from rx.internal import noop from rx.internal.utils import add_ref from rx.disposable import SingleAssignmentDisposable, SerialDisposable, CompositeDisposable, RefCountDisposable -from rx.subjects import Subject +from rx.subject import Subject from rx import operators as ops log = logging.getLogger("Rx") diff --git a/rx/core/operators/windowwithcount.py b/rx/core/operators/windowwithcount.py index 4b04d8571..bdc121495 100644 --- a/rx/core/operators/windowwithcount.py +++ b/rx/core/operators/windowwithcount.py @@ -5,7 +5,7 @@ from rx.internal.utils import add_ref from rx.disposable import SingleAssignmentDisposable, RefCountDisposable from rx.internal.exceptions import ArgumentOutOfRangeException -from rx.subjects import Subject +from rx.subject import Subject log = logging.getLogger("Rx") diff --git a/rx/core/operators/windowwithtime.py b/rx/core/operators/windowwithtime.py index 6ca3429ad..8f9ba7f69 100644 --- a/rx/core/operators/windowwithtime.py +++ b/rx/core/operators/windowwithtime.py @@ -6,7 +6,7 @@ from rx.internal.constants import DELTA_ZERO from rx.internal.utils import add_ref from rx.disposable import SingleAssignmentDisposable, CompositeDisposable, RefCountDisposable, SerialDisposable -from rx.subjects import Subject +from rx.subject import Subject def _window_with_time(timespan: typing.RelativeTime, timeshift: Optional[typing.RelativeTime] = None, diff --git a/rx/core/operators/windowwithtimeorcount.py b/rx/core/operators/windowwithtimeorcount.py index 29d1824d5..5e0365801 100644 --- a/rx/core/operators/windowwithtimeorcount.py +++ b/rx/core/operators/windowwithtimeorcount.py @@ -5,7 +5,7 @@ from rx.scheduler import timeout_scheduler from rx.internal.utils import add_ref from rx.disposable import SingleAssignmentDisposable, CompositeDisposable, RefCountDisposable, SerialDisposable -from rx.subjects import Subject +from rx.subject import Subject def _window_with_time_or_count(timespan: typing.RelativeTime, count: int, scheduler: Optional[typing.Scheduler] = None diff --git a/rx/core/typing.py b/rx/core/typing.py index 7c2f0d0ba..bf59272da 100644 --- a/rx/core/typing.py +++ b/rx/core/typing.py @@ -148,5 +148,5 @@ def on_completed(self) -> None: raise NotImplementedError @abstractmethod - def subscribe(self, observer: Observer[T_out] = None, scheduler: Scheduler = None) -> Disposable: + def subscribe(self, observer: Observer[T_out] = None, *, scheduler: Scheduler = None) -> Disposable: raise NotImplementedError diff --git a/rx/operators/__init__.py b/rx/operators/__init__.py index 1621ad39a..e06d234aa 100644 --- a/rx/operators/__init__.py +++ b/rx/operators/__init__.py @@ -7,7 +7,7 @@ from rx.internal.utils import NotSet from rx.core import Observable, ConnectableObservable, GroupedObservable, typing, pipe from rx.core.typing import Mapper, MapperIndexed, Predicate, PredicateIndexed, Comparer, Accumulator -from rx.subjects import Subject +from rx.subject import Subject def all(predicate: Predicate) -> Callable[[Observable], Observable]: diff --git a/rx/subjects/__init__.py b/rx/subject/__init__.py similarity index 74% rename from rx/subjects/__init__.py rename to rx/subject/__init__.py index 706aab191..d3cbaa154 100644 --- a/rx/subjects/__init__.py +++ b/rx/subject/__init__.py @@ -1,4 +1,4 @@ from .subject import Subject +from .asyncsubject import AsyncSubject from .behaviorsubject import BehaviorSubject from .replaysubject import ReplaySubject -from .asyncsubject import AsyncSubject \ No newline at end of file diff --git a/rx/subject/asyncsubject.py b/rx/subject/asyncsubject.py new file mode 100644 index 000000000..78b5bd84e --- /dev/null +++ b/rx/subject/asyncsubject.py @@ -0,0 +1,83 @@ +from typing import Any, Optional + +from rx.disposable import Disposable +from rx.core import typing + +from .subject import Subject +from .innersubscription import InnerSubscription + + +class AsyncSubject(Subject): + """Represents the result of an asynchronous operation. The last value + before the close notification, or the error received through + on_error, is sent to all subscribed observers.""" + + def __init__(self) -> None: + """Creates a subject that can only receive one value and that value is + cached for all future observations.""" + + super().__init__() + + self.value = None + self.has_value = False + + def _subscribe_core(self, + observer: typing.Observer, + scheduler: Optional[typing.Scheduler] = None + ) -> typing.Disposable: + with self.lock: + self.check_disposed() + if not self.is_stopped: + self.observers.append(observer) + return InnerSubscription(self, observer) + + ex = self.exception + has_value = self.has_value + value = self.value + + if ex: + observer.on_error(ex) + elif has_value: + observer.on_next(value) + observer.on_completed() + else: + observer.on_completed() + + return Disposable() + + def _on_next_core(self, value: Any) -> None: + """Remember the value. Upon completion, the most recently received value + will be passed on to all subscribed observers. + + Args: + value: The value to remember until completion + """ + with self.lock: + self.value = value + self.has_value = True + + def _on_completed_core(self) -> None: + """Notifies all subscribed observers of the end of the sequence. The + most recently received value, if any, will now be passed on to all + subscribed observers.""" + + with self.lock: + observers = self.observers.copy() + self.observers.clear() + value = self.value + has_value = self.has_value + + if has_value: + for observer in observers: + observer.on_next(value) + observer.on_completed() + else: + for observer in observers: + observer.on_completed() + + def dispose(self) -> None: + """Unsubscribe all observers and release resources.""" + + with self.lock: + self.value = None + super().dispose() diff --git a/rx/subject/behaviorsubject.py b/rx/subject/behaviorsubject.py new file mode 100644 index 000000000..30277d65e --- /dev/null +++ b/rx/subject/behaviorsubject.py @@ -0,0 +1,63 @@ +from typing import Any + +from rx.disposable import Disposable + +from .subject import Subject +from .innersubscription import InnerSubscription + + +class BehaviorSubject(Subject): + """Represents a value that changes over time. Observers can + subscribe to the subject to receive the last (or initial) value and + all subsequent notifications. + """ + + def __init__(self, value) -> None: + """Initializes a new instance of the BehaviorSubject class which + creates a subject that caches its last value and starts with the + specified value. + + Keyword parameters: + :param T value: Initial value sent to observers when no other + value has been received by the subject yet. + """ + + super().__init__() + + self.value = value + + def _subscribe_core(self, observer, scheduler=None): + with self.lock: + self.check_disposed() + if not self.is_stopped: + self.observers.append(observer) + observer.on_next(self.value) + return InnerSubscription(self, observer) + ex = self.exception + + if ex: + observer.on_error(ex) + else: + observer.on_completed() + + return Disposable() + + def _on_next_core(self, value: Any) -> None: + """Notifies all subscribed observers with the value.""" + with self.lock: + observers = self.observers.copy() + self.value = value + + for observer in observers: + observer.on_next(value) + + def dispose(self) -> None: + """Release all resources. + + Releases all resources used by the current instance of the + ReplaySubject class and unsubscribe all observers. + """ + + with self.lock: + self.value = None + super().dispose() diff --git a/rx/subjects/innersubscription.py b/rx/subject/innersubscription.py similarity index 99% rename from rx/subjects/innersubscription.py rename to rx/subject/innersubscription.py index b862ec521..0af6161a7 100644 --- a/rx/subjects/innersubscription.py +++ b/rx/subject/innersubscription.py @@ -2,6 +2,7 @@ from rx.core import typing + class InnerSubscription(typing.Disposable): def __init__(self, subject, observer): self.subject = subject diff --git a/rx/subject/replaysubject.py b/rx/subject/replaysubject.py new file mode 100644 index 000000000..3930ecbbc --- /dev/null +++ b/rx/subject/replaysubject.py @@ -0,0 +1,135 @@ +import sys + +from datetime import datetime +from typing import cast, Any, Optional, List, NamedTuple +from datetime import timedelta + +from rx.core import typing +from rx.scheduler import current_thread_scheduler +from rx.core.observer.scheduledobserver import ScheduledObserver + +from .subject import Subject + + +class RemovableDisposable(typing.Disposable): + def __init__(self, subject, observer): + self.subject = subject + self.observer = observer + + def dispose(self): + self.observer.dispose() + if not self.subject.is_disposed and self.observer in self.subject.observers: + self.subject.observers.remove(self.observer) + + +class QueueItem(NamedTuple): + interval: datetime + value: Any + + +class ReplaySubject(Subject): + """Represents an object that is both an observable sequence as well + as an observer. Each notification is broadcasted to all subscribed + and future observers, subject to buffer trimming policies. + """ + + def __init__(self, + buffer_size: int = None, + window: typing.RelativeTime = None, + scheduler: Optional[typing.Scheduler] = None + ) -> None: + """Initializes a new instance of the ReplaySubject class with + the specified buffer size, window and scheduler. + + Args: + buffer_size: [Optional] Maximum element count of the replay + buffer. + window [Optional]: Maximum time length of the replay buffer. + scheduler: [Optional] Scheduler the observers are invoked on. + """ + + super().__init__() + self.buffer_size = sys.maxsize if buffer_size is None else buffer_size + self.scheduler = scheduler or current_thread_scheduler + self.window = timedelta.max if window is None else self.scheduler.to_timedelta(window) + self.queue: List[QueueItem] = [] + + def _subscribe_core(self, + observer: typing.Observer, + scheduler: Optional[typing.Scheduler] = None + ) -> typing.Disposable: + so = ScheduledObserver(self.scheduler, observer) + subscription = RemovableDisposable(self, so) + + with self.lock: + self.check_disposed() + self._trim(self.scheduler.now) + self.observers.append(so) + + for item in self.queue: + so.on_next(item.value) + + if self.exception is not None: + so.on_error(self.exception) + elif self.is_stopped: + so.on_completed() + + so.ensure_active() + return subscription + + def _trim(self, now: datetime): + while len(self.queue) > self.buffer_size: + self.queue.pop(0) + + while self.queue and (now - self.queue[0].interval) > self.window: + self.queue.pop(0) + + def _on_next_core(self, value: Any) -> None: + """Notifies all subscribed observers with the value.""" + + with self.lock: + observers = self.observers.copy() + now = self.scheduler.now + self.queue.append(QueueItem(interval=now, value=value)) + self._trim(now) + + for observer in observers: + observer.on_next(value) + + for observer in observers: + cast(ScheduledObserver, observer).ensure_active() + + def _on_error_core(self, error: Exception) -> None: + """Notifies all subscribed observers with the exception.""" + + with self.lock: + observers = self.observers.copy() + self.observers.clear() + self.exception = error + now = self.scheduler.now + self._trim(now) + + for observer in observers: + observer.on_error(error) + cast(ScheduledObserver, observer).ensure_active() + + def _on_completed_core(self) -> None: + """Notifies all subscribed observers of the end of the sequence.""" + + with self.lock: + observers = self.observers.copy() + self.observers.clear() + now = self.scheduler.now + self._trim(now) + + for observer in observers: + observer.on_completed() + cast(ScheduledObserver, observer).ensure_active() + + def dispose(self) -> None: + """Releases all resources used by the current instance of the + ReplaySubject class and unsubscribe all observers.""" + + with self.lock: + self.queue.clear() + super().dispose() diff --git a/rx/subjects/subject.py b/rx/subject/subject.py similarity index 57% rename from rx/subjects/subject.py rename to rx/subject/subject.py index 5e84fe477..db9d9d77d 100644 --- a/rx/subjects/subject.py +++ b/rx/subject/subject.py @@ -2,15 +2,13 @@ from typing import Any, List, Optional from rx.disposable import Disposable -from rx.core.typing import Observer, Scheduler -from rx.core import Observable, typing +from rx.core import Observable, Observer, typing from rx.internal import DisposedException -from .anonymoussubject import AnonymousSubject from .innersubscription import InnerSubscription -class Subject(Observable, Observer): +class Subject(Observable, Observer, typing.Subject): """Represents an object that is both an observable sequence as well as an observer. Each notification is broadcasted to all subscribed observers. @@ -20,8 +18,7 @@ def __init__(self) -> None: super().__init__() self.is_disposed = False - self.is_stopped = False - self.observers: List[Observer] = [] + self.observers: List[typing.Observer] = [] self.exception: Optional[Exception] = None self.lock = threading.RLock() @@ -30,35 +27,39 @@ def check_disposed(self) -> None: if self.is_disposed: raise DisposedException() - def _subscribe_core(self, observer: Observer, scheduler: Optional[Scheduler] = None) -> typing.Disposable: + def _subscribe_core(self, + observer: typing.Observer, + scheduler: Optional[typing.Scheduler] = None + ) -> typing.Disposable: with self.lock: self.check_disposed() if not self.is_stopped: self.observers.append(observer) return InnerSubscription(self, observer) - if self.exception: + if self.exception is not None: observer.on_error(self.exception) - return Disposable() - - observer.on_completed() + else: + observer.on_completed() return Disposable() - def on_completed(self) -> None: - """Notifies all subscribed observers of the end of the - sequence.""" + def on_next(self, value: Any) -> None: + """Notifies all subscribed observers with the value. + + Args: + value: The value to send to all subscribed observers. + """ - observers = None with self.lock: self.check_disposed() - if not self.is_stopped: - observers = self.observers[:] - self.observers = [] - self.is_stopped = True + super().on_next(value) - if observers: - for observer in observers: - observer.on_completed() + def _on_next_core(self, value: Any) -> None: + with self.lock: + observers = self.observers.copy() + + for observer in observers: + observer.on_next(value) def on_error(self, error: Exception) -> None: """Notifies all subscribed observers with the exception. @@ -67,34 +68,33 @@ def on_error(self, error: Exception) -> None: error: The exception to send to all subscribed observers. """ - os = None with self.lock: self.check_disposed() - if not self.is_stopped: - os = self.observers[:] - self.observers = [] - self.is_stopped = True - self.exception = error + super().on_error(error) - if os: - for observer in os: - observer.on_error(error) + def _on_error_core(self, error: Exception) -> None: + with self.lock: + observers = self.observers.copy() + self.observers.clear() + self.exception = error - def on_next(self, value: Any) -> None: - """Notifies all subscribed observers with the value. + for observer in observers: + observer.on_error(error) + + def on_completed(self) -> None: + """Notifies all subscribed observers of the end of the sequence.""" - Args: - value: The value to send to all subscribed observers. - """ - os = None with self.lock: self.check_disposed() - if not self.is_stopped: - os = self.observers[:] + super().on_completed() - if os: - for observer in os: - observer.on_next(value) + def _on_completed_core(self) -> None: + with self.lock: + observers = self.observers.copy() + self.observers.clear() + + for observer in observers: + observer.on_completed() def dispose(self) -> None: """Unsubscribe all observers and release resources.""" @@ -102,7 +102,5 @@ def dispose(self) -> None: with self.lock: self.is_disposed = True self.observers = [] - - @classmethod - def create(cls, observer, observable) -> AnonymousSubject: - return AnonymousSubject(observer, observable) + self.exception = None + super().dispose() diff --git a/rx/subjects/anonymoussubject.py b/rx/subjects/anonymoussubject.py deleted file mode 100644 index bad347397..000000000 --- a/rx/subjects/anonymoussubject.py +++ /dev/null @@ -1,25 +0,0 @@ -from typing import Any, Optional - -from rx.core import typing -from rx.core import Observable -from rx.core.typing import Observer, Scheduler - - -class AnonymousSubject(Observable, Observer): - def __init__(self, observer: Observer, observable: Observable) -> None: - super().__init__() - - self.observer = observer - self.observable = observable - - def _subscribe_core(self, observer: Observer, scheduler: Optional[Scheduler] = None) -> typing.Disposable: - return self.observable.subscribe(observer, scheduler=scheduler) - - def on_next(self, value: Any) -> None: - self.observer.on_next(value) - - def on_error(self, error: Exception) -> None: - self.observer.on_error(error) - - def on_completed(self) -> None: - self.observer.on_completed() diff --git a/rx/subjects/asyncsubject.py b/rx/subjects/asyncsubject.py deleted file mode 100644 index 1bf9ebbf0..000000000 --- a/rx/subjects/asyncsubject.py +++ /dev/null @@ -1,108 +0,0 @@ -import threading -from typing import Any, List, Optional - -from rx.disposable import Disposable -from rx.core import Observable -from rx.core.typing import Observer -from rx.internal import DisposedException - -from .innersubscription import InnerSubscription - - -class AsyncSubject(Observable, Observer): - """Represents the result of an asynchronous operation. The last value - before the close notification, or the error received through - on_error, is sent to all subscribed observers.""" - - def __init__(self) -> None: - """Creates a subject that can only receive one value and that value is - cached for all future observations.""" - - super(AsyncSubject, self).__init__() - - self.is_disposed = False - self.is_stopped = False - self.value = None - self.has_value = False - self.observers: List[Observer] = [] - self.exception: Optional[Exception] = None - - self.lock = threading.RLock() - - def check_disposed(self) -> None: - if self.is_disposed: - raise DisposedException() - - def _subscribe_core(self, observer: Observer, scheduler=None): - with self.lock: - self.check_disposed() - if not self.is_stopped: - self.observers.append(observer) - return InnerSubscription(self, observer) - - ex = self.exception - hv = self.has_value - v = self.value - - if ex: - observer.on_error(ex) - elif hv: - observer.on_next(v) - observer.on_completed() - else: - observer.on_completed() - - return Disposable() - - def on_completed(self) -> None: - value = None - os = None - hv = None - - with self.lock: - self.check_disposed() - if not self.is_stopped: - os = self.observers[:] - self.observers = [] - - self.is_stopped = True - value = self.value - hv = self.has_value - - if os: - if hv: - for o in os: - o.on_next(value) - o.on_completed() - else: - for o in os: - o.on_completed() - - def on_error(self, error: Exception) -> None: - os = None - - with self.lock: - self.check_disposed() - if not self.is_stopped: - os = self.observers[:] - self.observers = [] - self.is_stopped = True - self.exception = error - - if os: - for o in os: - o.on_error(error) - - def on_next(self, value: Any) -> None: - with self.lock: - self.check_disposed() - if not self.is_stopped: - self.value = value - self.has_value = True - - def dispose(self) -> None: - with self.lock: - self.is_disposed = True - self.observers = [] - self.exception = None - self.value = None diff --git a/rx/subjects/behaviorsubject.py b/rx/subjects/behaviorsubject.py deleted file mode 100644 index 93ee7c65f..000000000 --- a/rx/subjects/behaviorsubject.py +++ /dev/null @@ -1,112 +0,0 @@ -from typing import Any -import threading - -from rx.disposable import Disposable -from rx.core import Observable -from rx.core.typing import Observer -from rx.internal import DisposedException - -from .innersubscription import InnerSubscription - - -class BehaviorSubject(Observable, Observer): - """Represents a value that changes over time. Observers can - subscribe to the subject to receive the last (or initial) value and - all subsequent notifications. - """ - - def __init__(self, value): - """Initializes a new instance of the BehaviorSubject class which - creates a subject that caches its last value and starts with the - specified value. - - Keyword parameters: - :param T value: Initial value sent to observers when no other - value has been received by the subject yet. - """ - - super(BehaviorSubject, self).__init__() - - self.value = value - self.observers = [] - self.is_disposed = False - self.is_stopped = False - self.exception = None - - self.lock = threading.RLock() - - def check_disposed(self): - if self.is_disposed: - raise DisposedException() - - def _subscribe_core(self, observer, scheduler=None): - ex = None - - with self.lock: - self.check_disposed() - if not self.is_stopped: - self.observers.append(observer) - observer.on_next(self.value) - return InnerSubscription(self, observer) - ex = self.exception - - if ex: - observer.on_error(ex) - else: - observer.on_completed() - - return Disposable() - - def on_completed(self) -> None: - """Notifies all subscribed observers of the end of the sequence.""" - - os = None - with self.lock: - self.check_disposed() - if not self.is_stopped: - os = self.observers[:] - self.observers = [] - self.is_stopped = True - - if os: - for o in os: - o.on_completed() - - def on_error(self, error: Exception) -> None: - """Notifie all subscribed observers with the exception.""" - os = None - with self.lock: - self.check_disposed() - if not self.is_stopped: - os = self.observers[:] - self.observers = [] - self.is_stopped = True - self.exception = error - - if os: - for o in os: - o.on_error(error) - - def on_next(self, value: Any) -> None: - """Notifie all subscribed observers with the value.""" - os = None - with self.lock: - self.check_disposed() - if not self.is_stopped: - os = self.observers[:] - self.value = value - if os: - for o in os: - o.on_next(value) - - def dispose(self) -> None: - """Release all resources. - - Releases all resources used by the current instance of the - ReplaySubject class and unsubscribe all observers. - """ - with self.lock: - self.is_disposed = True - self.observers = None - self.value = None - self.exception = None diff --git a/rx/subjects/replaysubject.py b/rx/subjects/replaysubject.py deleted file mode 100644 index 69a448839..000000000 --- a/rx/subjects/replaysubject.py +++ /dev/null @@ -1,150 +0,0 @@ -import sys -import threading -from typing import Any, Optional, List -from datetime import timedelta - -from rx.core import Observable, typing -from rx.core.typing import Observer -from rx.internal import DisposedException -from rx.scheduler import current_thread_scheduler -from rx.core.observer.scheduledobserver import ScheduledObserver - - -class RemovableDisposable: - def __init__(self, subject, observer): - self.subject = subject - self.observer = observer - - def dispose(self): - self.observer.dispose() - if not self.subject.is_disposed and self.observer in self.subject.observers: - self.subject.observers.remove(self.observer) - - -class ReplaySubject(Observable, Observer): - """Represents an object that is both an observable sequence as well - as an observer. Each notification is broadcasted to all subscribed - and future observers, subject to buffer trimming policies. - """ - - def __init__(self, buffer_size: int = None, window: typing.RelativeTime = None, scheduler: typing.Scheduler = None): - """Initializes a new instance of the ReplaySubject class with - the specified buffer size, window and scheduler. - - Args: - buffer_size: [Optional] Maximum element count of the replay - buffer. - window [Optional]: Maximum time length of the replay buffer. - scheduler: [Optional] Scheduler the observers are invoked on. - """ - - self.buffer_size = sys.maxsize if buffer_size is None else buffer_size - self.scheduler = scheduler or current_thread_scheduler - self.window = timedelta.max if window is None else self.scheduler.to_timedelta(window) - self.queue: List[Any] = [] - self.observers: List[ScheduledObserver] = [] - self.is_stopped = False - self.is_disposed = False - self.has_error = False - self.error: Optional[Exception] = None - - self.lock = threading.RLock() - - super(ReplaySubject, self).__init__() - - def check_disposed(self): - if self.is_disposed: - raise DisposedException() - - def _subscribe_core(self, observer, scheduler=None): - so = ScheduledObserver(self.scheduler, observer) - subscription = RemovableDisposable(self, so) - - with self.lock: - self.check_disposed() - self._trim(self.scheduler.now) - self.observers.append(so) - - for item in self.queue: - so.on_next(item['value']) - - if self.has_error: - so.on_error(self.error) - elif self.is_stopped: - so.on_completed() - - so.ensure_active() - return subscription - - def _trim(self, now): - while len(self.queue) > self.buffer_size: - self.queue.pop(0) - - while self.queue and (now - self.queue[0]['interval']) > self.window: - self.queue.pop(0) - - def on_next(self, value: Any) -> None: - """Notifies all subscribed observers with the value.""" - - os = None - with self.lock: - self.check_disposed() - if not self.is_stopped: - os = self.observers[:] - now = self.scheduler.now - self.queue.append(dict(interval=now, value=value)) - self._trim(now) - - for observer in os: - observer.on_next(value) - if os: - for observer in os: - observer.ensure_active() - - def on_error(self, error: Exception) -> None: - """Notifies all subscribed observers with the exception.""" - - os = None - with self.lock: - self.check_disposed() - if not self.is_stopped: - os = self.observers[:] - self.observers = [] - self.is_stopped = True - self.error = error - self.has_error = True - now = self.scheduler.now - self._trim(now) - - for observer in os: - observer.on_error(error) - if os: - for observer in os: - observer.ensure_active() - - def on_completed(self) -> None: - """Notifies all subscribed observers of the end of the sequence.""" - - os = None - with self.lock: - self.check_disposed() - if not self.is_stopped: - os = self.observers[:] - self.observers = [] - self.is_stopped = True - now = self.scheduler.now - self._trim(now) - for observer in os: - observer.on_completed() - if os: - for observer in os: - observer.ensure_active() - - def dispose(self) -> None: - """Releases all resources used by the current instance of the - ReplaySubject class and unsubscribe all observers.""" - - with self.lock: - self.is_disposed = True - self.observers = [] - self.queue = [] diff --git a/setup.py b/setup.py index 9d7e8bb14..bf85a0aca 100644 --- a/setup.py +++ b/setup.py @@ -59,7 +59,7 @@ 'rx.core.operators', 'rx.core.operators.connectable', 'rx.core.observer', 'rx.core.observable', 'rx.scheduler', 'rx.scheduler.eventloop', 'rx.scheduler.mainloop', - 'rx.operators', 'rx.disposable', 'rx.subjects', + 'rx.operators', 'rx.disposable', 'rx.subject', 'rx.testing'], package_dir={'rx': 'rx'}, include_package_data=True diff --git a/tests/test_observable/test_connectableobservable.py b/tests/test_observable/test_connectableobservable.py index 66fe20407..007091453 100644 --- a/tests/test_observable/test_connectableobservable.py +++ b/tests/test_observable/test_connectableobservable.py @@ -5,7 +5,7 @@ from rx.core import Observable from rx.core.typing import Observer from rx.testing import TestScheduler, ReactiveTest -from rx.subjects import Subject +from rx.subject import Subject from rx.core import ConnectableObservable on_next = ReactiveTest.on_next diff --git a/tests/test_observable/test_multicast.py b/tests/test_observable/test_multicast.py index 930eec1c8..c0183611d 100644 --- a/tests/test_observable/test_multicast.py +++ b/tests/test_observable/test_multicast.py @@ -2,7 +2,7 @@ import pytest from rx import operators as ops -from rx.subjects import Subject +from rx.subject import Subject from rx.testing import TestScheduler, ReactiveTest on_next = ReactiveTest.on_next diff --git a/tests/test_subjects/__init__.py b/tests/test_subject/__init__.py similarity index 100% rename from tests/test_subjects/__init__.py rename to tests/test_subject/__init__.py diff --git a/tests/test_subjects/test_asyncsubject.py b/tests/test_subject/test_asyncsubject.py similarity index 99% rename from tests/test_subjects/test_asyncsubject.py rename to tests/test_subject/test_asyncsubject.py index 804fe1269..4393b39f8 100644 --- a/tests/test_subjects/test_asyncsubject.py +++ b/tests/test_subject/test_asyncsubject.py @@ -1,7 +1,7 @@ import pytest from rx.testing import TestScheduler, ReactiveTest -from rx.subjects import AsyncSubject +from rx.subject import AsyncSubject from rx.internal.exceptions import DisposedException on_next = ReactiveTest.on_next @@ -164,6 +164,7 @@ def action10(scheduler, state=None): assert results2.messages == [on_next(630, 7), on_completed(630)] assert results3.messages == [on_next(900, 7), on_completed(900)] + def test_error(): subject = [None] subscription = [None] @@ -300,6 +301,7 @@ def action10(scheduler, state=None): assert results2.messages == [on_completed(630)] assert results3.messages == [on_completed(900)] + def test_subject_disposed(): subject = [None] subscription1 = [None] @@ -388,5 +390,6 @@ def action17(scheduler, state=None): assert results2.messages == [] assert results3.messages == [] + if __name__ == '__main__': unittest.main() diff --git a/tests/test_subjects/test_behaviorsubject.py b/tests/test_subject/test_behaviorsubject.py similarity index 99% rename from tests/test_subjects/test_behaviorsubject.py rename to tests/test_subject/test_behaviorsubject.py index 8ec86a5d6..a8eeb49c8 100644 --- a/tests/test_subjects/test_behaviorsubject.py +++ b/tests/test_subject/test_behaviorsubject.py @@ -1,7 +1,7 @@ import pytest from rx.testing import TestScheduler, ReactiveTest -from rx.subjects import BehaviorSubject +from rx.subject import BehaviorSubject from rx.internal.exceptions import DisposedException on_next = ReactiveTest.on_next @@ -280,6 +280,7 @@ def action10(scheduler, state=None): assert results3.messages == [ on_error(900, ex)] + def test_canceled(): scheduler = TestScheduler() @@ -352,6 +353,7 @@ def action10(scheduler, state=None): assert results3.messages == [ on_completed(900)] + def test_subject_disposed(): scheduler = TestScheduler() diff --git a/tests/test_subjects/test_replaysubject.py b/tests/test_subject/test_replaysubject.py similarity index 99% rename from tests/test_subjects/test_replaysubject.py rename to tests/test_subject/test_replaysubject.py index 4b13905d8..a925b1107 100644 --- a/tests/test_subjects/test_replaysubject.py +++ b/tests/test_subject/test_replaysubject.py @@ -2,7 +2,7 @@ import pytest from rx.testing import TestScheduler, ReactiveTest -from rx.subjects import ReplaySubject +from rx.subject import ReplaySubject from rx.internal.exceptions import DisposedException on_next = ReactiveTest.on_next @@ -201,6 +201,7 @@ def action10(scheduler, state=None): on_next(900, 10), on_next(940, 11)] + def test_finite(): scheduler = TestScheduler() @@ -445,6 +446,7 @@ def action10(scheduler, state=None): assert results3.messages == [ on_completed(900)] + def test_subject_disposed(): scheduler = TestScheduler() diff --git a/tests/test_subjects/test_subject.py b/tests/test_subject/test_subject.py similarity index 93% rename from tests/test_subjects/test_subject.py rename to tests/test_subject/test_subject.py index 353b9a018..d375716a9 100644 --- a/tests/test_subjects/test_subject.py +++ b/tests/test_subject/test_subject.py @@ -1,8 +1,5 @@ -import rx -from rx.core import Observable, Observer - from rx.testing import TestScheduler, ReactiveTest -from rx.subjects import Subject +from rx.subject import Subject on_next = ReactiveTest.on_next on_completed = ReactiveTest.on_completed @@ -16,10 +13,12 @@ class RxException(Exception): pass + # Helper function for raising exceptions within lambdas def _raise(ex): raise RxException(ex) + def test_infinite(): subscription = [None] subscription1 = [None] @@ -320,39 +319,3 @@ def action10(scheduler, state=None): assert results1.messages == [] assert results2.messages == [on_completed(630)] assert results3.messages == [on_completed(900)] - - -def test_subject_create(): - _x = [None] - _ex = [None] - done = False - - def on_next(x): - _x[0] = x - - def on_error(ex): - _ex[0] = ex - - def on_completed(): - done = True - - v = Observer(on_next, on_error, on_completed) - - o = rx.return_value(42) - - s = Subject.create(v, o) - - def on_next2(x): - _x[0] = x - s.subscribe(on_next2) - - assert(42 == _x[0]) - s.on_next(21) - - e = 'ex' - s.on_error(e) - - assert(e == _ex[0]) - - s.on_completed() - assert(not done)