Skip to content

Commit

Permalink
Fix timer
Browse files Browse the repository at this point in the history
  • Loading branch information
mat committed Sep 5, 2023
1 parent 8263b1e commit 264ceb0
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 5 deletions.
3 changes: 3 additions & 0 deletions reactivex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,9 @@ def timer(
[ timer(2) ]
--0-|
[ timer(2, 4) ]
--0----1----2--
Examples:
>>> res = reactivex.timer(datetime(...))
>>> res = reactivex.timer(datetime(...), 0.1)
Expand Down
10 changes: 5 additions & 5 deletions reactivex/observable/timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ def subscribe(
observer: abc.ObserverBase[int], scheduler_: Optional[abc.SchedulerBase] = None
) -> abc.DisposableBase:
_scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton()
nonlocal duetime
due_time = duetime

if not isinstance(duetime, datetime):
duetime = _scheduler.now + _scheduler.to_timedelta(duetime)
if not isinstance(due_time, datetime):
due_time = _scheduler.now + _scheduler.to_timedelta(due_time)

p = max(0.0, _scheduler.to_seconds(period))
mad = MultipleAssignmentDisposable()
dt = duetime
dt = due_time
count = 0

def action(scheduler: abc.SchedulerBase, state: Any) -> None:
Expand Down Expand Up @@ -107,7 +107,7 @@ def action(count: Optional[int] = None) -> Optional[int]:
return None

if not isinstance(_scheduler, PeriodicScheduler):
raise ValueError("Sceduler must be PeriodicScheduler")
raise ValueError("Scheduler must be PeriodicScheduler")
return _scheduler.schedule_periodic(period, action, state=0)

return Observable(subscribe)
Expand Down
51 changes: 51 additions & 0 deletions tests/test_observable/test_timer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import unittest

import reactivex
from reactivex import operators
from reactivex.testing import ReactiveTest, TestScheduler

on_next = ReactiveTest.on_next
Expand Down Expand Up @@ -126,3 +127,53 @@ def create():

results = scheduler.start(create)
assert results.messages == [on_next(500, 0), on_next(800, 1)]

def test_periodic_timer_second_subscription(self):
scheduler = TestScheduler()
t = reactivex.timer(duetime=200, period=300, scheduler=scheduler)

def create():
return reactivex.merge(
t.pipe(operators.map(lambda x: (x, "first"))),
reactivex.concat(reactivex.timer(100, scheduler=scheduler), t).pipe(
operators.map(lambda x: (x, "second"))
),
)

results = scheduler.start(create)
assert results.messages == [
on_next(300, (0, "second")),
on_next(400, (0, "first")),
on_next(500, (0, "second")),
on_next(700, (1, "first")),
on_next(800, (1, "second")),
]

def test_on_off_timer_repeat(self):
scheduler = TestScheduler()
t = reactivex.timer(duetime=230, scheduler=scheduler)

def create():
return t.pipe(operators.repeat())

results = scheduler.start(create)
assert results.messages == [
on_next(430, 0),
on_next(660, 0),
on_next(890, 0),
]

def test_periodic_timer_repeat(self):
scheduler = TestScheduler()
t = reactivex.timer(duetime=130, period=200, scheduler=scheduler)

def create():
return t.pipe(operators.take(3), operators.repeat())

results = scheduler.start(create)
assert results.messages == [
on_next(330, 0),
on_next(530, 1),
on_next(730, 2),
on_next(860, 0),
]

0 comments on commit 264ceb0

Please sign in to comment.