Skip to content

Commit

Permalink
Subjects refactor (#416)
Browse files Browse the repository at this point in the history
* Remove AnonymousSubject

* Make all classes in subjects package extend from Subject

* Rename subjects package to subject

* Align subject signature with observable (scheduler kw only)

* Make subjects mesh with core.Observer
  • Loading branch information
erikkemperman committed Jun 5, 2019
1 parent f234f14 commit 7edf352
Show file tree
Hide file tree
Showing 43 changed files with 378 additions and 516 deletions.
8 changes: 4 additions & 4 deletions docs/reference_subject.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
Subject
========

.. autoclass:: rx.subjects.Subject
.. autoclass:: rx.subject.Subject
:members:

.. autoclass:: rx.subjects.BehaviorSubject
.. autoclass:: rx.subject.BehaviorSubject
:members:

.. autoclass:: rx.subjects.ReplaySubject
.. autoclass:: rx.subject.ReplaySubject
:members:

.. autoclass:: rx.subjects.AsyncSubject
.. autoclass:: rx.subject.AsyncSubject
:members:
2 changes: 1 addition & 1 deletion examples/autocomplete/autocomplete.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from tornado import ioloop

from rx import operators as ops
from rx.subjects import Subject
from rx.subject import Subject
from rx.scheduler.eventloop import IOLoopScheduler

scheduler = IOLoopScheduler(ioloop.IOLoop.current())
Expand Down
2 changes: 1 addition & 1 deletion examples/autocomplete/autocomplete_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from rx import operators as ops
from rx.scheduler.eventloop import AsyncIOScheduler
from rx.subjects import Subject
from rx.subject import Subject


def search_wikipedia(term):
Expand Down
3 changes: 2 additions & 1 deletion examples/autocomplete/bottle_autocomplete.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from geventwebsocket.handler import WebSocketHandler
import json, requests
import rx
from rx.subject import Subject
from rx.scheduler.eventloop import GEventScheduler

class WikiFinder:
Expand Down Expand Up @@ -38,7 +39,7 @@ def handle_websocket():
wsock = request.environ.get('wsgi.websocket')
if not wsock:
abort(400, 'Expected WebSocket request.')
stream = rx.subjects.Subject()
stream = Subject()
query = stream.map(
lambda x: x["term"]
).filter(
Expand Down
2 changes: 1 addition & 1 deletion examples/chess/chess.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pygame

from rx import operators as ops
from rx.subjects import Subject
from rx.subject import Subject
from rx.scheduler.mainloop import PyGameScheduler


Expand Down
6 changes: 5 additions & 1 deletion examples/konamicode/konamicode.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
from tornado import ioloop

from rx import operators as ops
from rx.subjects import Subject
from rx.subject import Subject

UP, DOWN, LEFT, RIGHT, B, A = 38, 40, 37, 39, 66, 65
codes = [UP, UP, DOWN, DOWN, LEFT, RIGHT, LEFT, RIGHT, B, A]


class WSHandler(WebSocketHandler):
def open(self):
print("WebSocket opened")
Expand Down Expand Up @@ -41,10 +42,12 @@ def on_message(self, message):
def on_close(self):
print("WebSocket closed")


class MainHandler(RequestHandler):
def get(self):
self.render("index.html")


def main():
port = os.environ.get("PORT", 8080)
app = Application([
Expand All @@ -56,5 +59,6 @@ def main():
app.listen(port)
ioloop.IOLoop.current().start()


if __name__ == '__main__':
main()
2 changes: 1 addition & 1 deletion examples/timeflies/timeflies_gtk.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import rx
from rx import operators as ops
from rx.subjects import Subject
from rx.subject import Subject
from rx.scheduler.mainloop import GtkScheduler

import gi
Expand Down
2 changes: 1 addition & 1 deletion examples/timeflies/timeflies_qt.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import rx
from rx import operators as ops
from rx.subjects import Subject
from rx.subject import Subject
from rx.scheduler.mainloop import QtScheduler

try:
Expand Down
2 changes: 1 addition & 1 deletion examples/timeflies/timeflies_tkinter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import rx
from rx import operators as ops
from rx.subjects import Subject
from rx.subject import Subject
from rx.scheduler.mainloop import TkinterScheduler


Expand Down
2 changes: 1 addition & 1 deletion examples/timeflies/timeflies_wx.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import rx
from rx import operators as ops
from rx.subjects import Subject
from rx.subject import Subject
from rx.scheduler.mainloop import WxScheduler

import wx
Expand Down
2 changes: 1 addition & 1 deletion notebooks/Getting Started.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@
}
],
"source": [
"from rx.subjects import Subject\n",
"from rx.subject import Subject\n",
"\n",
"stream = Subject()\n",
"stream.on_next(41)\n",
Expand Down
4 changes: 3 additions & 1 deletion notebooks/reactivex.io/startup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
O = rx.Observable

ts_glob = 0 # global start time


def reset_start_time(show_doc_for=None, title=True, sleep=None):
'resets global start time and also prints doc strings'
global ts_glob
Expand Down Expand Up @@ -108,7 +110,7 @@ def _cur():
return _cur()

from rx.scheduler import new_thread_scheduler, timeout_scheduler
from rx.subjects import Subject
from rx.subject import Subject
from rx.testing import marbles, dump
def marble_stream(s):
return O.from_marbles(s).to_blocking()
Expand Down
2 changes: 1 addition & 1 deletion rx/core/abc/subject.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ def on_completed(self):
raise NotImplementedError

@abstractmethod
def subscribe(self, observer=None, scheduler=None):
def subscribe(self, observer=None, *, scheduler=None):
raise NotImplementedError
2 changes: 1 addition & 1 deletion rx/core/observable/toasync.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from rx.core import Observable
from rx.core.typing import Scheduler
from rx.scheduler import timeout_scheduler
from rx.subjects import AsyncSubject
from rx.subject import AsyncSubject


def _to_async(func: Callable,
Expand Down
2 changes: 1 addition & 1 deletion rx/core/operators/groupbyuntil.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from rx import operators as ops
from rx.core import Observable, GroupedObservable
from rx.core.typing import Mapper
from rx.subjects import Subject
from rx.subject import Subject
from rx.disposable import CompositeDisposable, RefCountDisposable, SingleAssignmentDisposable
from rx.internal.basic import identity

Expand Down
2 changes: 1 addition & 1 deletion rx/core/operators/groupjoin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from rx.core import Observable
from rx.internal.utils import add_ref
from rx.disposable import SingleAssignmentDisposable, RefCountDisposable, CompositeDisposable
from rx.subjects import Subject
from rx.subject import Subject

log = logging.getLogger("Rx")

Expand Down
2 changes: 1 addition & 1 deletion rx/core/operators/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from rx import operators as ops
from rx.core import Observable, ConnectableObservable, pipe
from rx.core.typing import Mapper
from rx.subjects import Subject
from rx.subject import Subject


def _publish(mapper: Optional[Mapper] = None) -> Callable[[Observable], ConnectableObservable]:
Expand Down
2 changes: 1 addition & 1 deletion rx/core/operators/publishvalue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from rx import operators as ops
from rx.core import Observable
from rx.subjects import BehaviorSubject
from rx.subject import BehaviorSubject
from rx.core.typing import Mapper


Expand Down
2 changes: 1 addition & 1 deletion rx/core/operators/replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from rx import operators as ops
from rx.core import Observable, ConnectableObservable, typing
from rx.core.typing import Scheduler, Mapper
from rx.subjects import ReplaySubject
from rx.subject import ReplaySubject


def _replay(mapper: Optional[Mapper] = None,
Expand Down
2 changes: 1 addition & 1 deletion rx/core/operators/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from rx.internal import noop
from rx.internal.utils import add_ref
from rx.disposable import SingleAssignmentDisposable, SerialDisposable, CompositeDisposable, RefCountDisposable
from rx.subjects import Subject
from rx.subject import Subject
from rx import operators as ops

log = logging.getLogger("Rx")
Expand Down
2 changes: 1 addition & 1 deletion rx/core/operators/windowwithcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from rx.internal.utils import add_ref
from rx.disposable import SingleAssignmentDisposable, RefCountDisposable
from rx.internal.exceptions import ArgumentOutOfRangeException
from rx.subjects import Subject
from rx.subject import Subject

log = logging.getLogger("Rx")

Expand Down
2 changes: 1 addition & 1 deletion rx/core/operators/windowwithtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from rx.internal.constants import DELTA_ZERO
from rx.internal.utils import add_ref
from rx.disposable import SingleAssignmentDisposable, CompositeDisposable, RefCountDisposable, SerialDisposable
from rx.subjects import Subject
from rx.subject import Subject


def _window_with_time(timespan: typing.RelativeTime, timeshift: Optional[typing.RelativeTime] = None,
Expand Down
2 changes: 1 addition & 1 deletion rx/core/operators/windowwithtimeorcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from rx.scheduler import timeout_scheduler
from rx.internal.utils import add_ref
from rx.disposable import SingleAssignmentDisposable, CompositeDisposable, RefCountDisposable, SerialDisposable
from rx.subjects import Subject
from rx.subject import Subject


def _window_with_time_or_count(timespan: typing.RelativeTime, count: int, scheduler: Optional[typing.Scheduler] = None
Expand Down
2 changes: 1 addition & 1 deletion rx/core/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,5 +148,5 @@ def on_completed(self) -> None:
raise NotImplementedError

@abstractmethod
def subscribe(self, observer: Observer[T_out] = None, scheduler: Scheduler = None) -> Disposable:
def subscribe(self, observer: Observer[T_out] = None, *, scheduler: Scheduler = None) -> Disposable:
raise NotImplementedError
2 changes: 1 addition & 1 deletion rx/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from rx.internal.utils import NotSet
from rx.core import Observable, ConnectableObservable, GroupedObservable, typing, pipe
from rx.core.typing import Mapper, MapperIndexed, Predicate, PredicateIndexed, Comparer, Accumulator
from rx.subjects import Subject
from rx.subject import Subject


def all(predicate: Predicate) -> Callable[[Observable], Observable]:
Expand Down
2 changes: 1 addition & 1 deletion rx/subjects/__init__.py → rx/subject/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .subject import Subject
from .asyncsubject import AsyncSubject
from .behaviorsubject import BehaviorSubject
from .replaysubject import ReplaySubject
from .asyncsubject import AsyncSubject
83 changes: 83 additions & 0 deletions rx/subject/asyncsubject.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from typing import Any, Optional

from rx.disposable import Disposable
from rx.core import typing

from .subject import Subject
from .innersubscription import InnerSubscription


class AsyncSubject(Subject):
"""Represents the result of an asynchronous operation. The last value
before the close notification, or the error received through
on_error, is sent to all subscribed observers."""

def __init__(self) -> None:
"""Creates a subject that can only receive one value and that value is
cached for all future observations."""

super().__init__()

self.value = None
self.has_value = False

def _subscribe_core(self,
observer: typing.Observer,
scheduler: Optional[typing.Scheduler] = None
) -> typing.Disposable:
with self.lock:
self.check_disposed()
if not self.is_stopped:
self.observers.append(observer)
return InnerSubscription(self, observer)

ex = self.exception
has_value = self.has_value
value = self.value

if ex:
observer.on_error(ex)
elif has_value:
observer.on_next(value)
observer.on_completed()
else:
observer.on_completed()

return Disposable()

def _on_next_core(self, value: Any) -> None:
"""Remember the value. Upon completion, the most recently received value
will be passed on to all subscribed observers.
Args:
value: The value to remember until completion
"""
with self.lock:
self.value = value
self.has_value = True

def _on_completed_core(self) -> None:
"""Notifies all subscribed observers of the end of the sequence. The
most recently received value, if any, will now be passed on to all
subscribed observers."""

with self.lock:
observers = self.observers.copy()
self.observers.clear()
value = self.value
has_value = self.has_value

if has_value:
for observer in observers:
observer.on_next(value)
observer.on_completed()
else:
for observer in observers:
observer.on_completed()

def dispose(self) -> None:
"""Unsubscribe all observers and release resources."""

with self.lock:
self.value = None
super().dispose()
Loading

0 comments on commit 7edf352

Please sign in to comment.