Skip to content

Commit

Permalink
push triggers pull, fix glitches
Browse files Browse the repository at this point in the history
  • Loading branch information
limemloh committed Sep 4, 2019
1 parent 3b4a5ed commit 3a3cfd0
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 95 deletions.
128 changes: 59 additions & 69 deletions src/behavior.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { cons, DoubleLinkedList, Node, fromArray, nil } from "./datastructures";
import { combine } from "./index";
import { combine, isPlaceholder } from "./index";
import { State, Reactive, Time, BListener, Parent, SListener } from "./common";
import { Future, BehaviorFuture } from "./future";
import * as F from "./future";
Expand Down Expand Up @@ -69,27 +69,25 @@ export abstract class Behavior<A> extends Reactive<A, BListener>
}
abstract update(t: number): A;
pushB(t: number): void {
if (this.state === State.Push) {
const newValue = this.update(t);
this.pulledAt = t;
if (this.last !== newValue) {
this.changedAt = t;
this.last = newValue;
pushToChildren(t, this);
}
if (this.changedAt === t) {
// This prevents a second push with same timestamp, to be further pushed
return;
}
this.pull(t);
if (this.changedAt === t && this.state === State.Push) {
pushToChildren(t, this);
}
}
pull(t: number): void {
if (this.pulledAt === undefined || this.pulledAt < t) {
this.pulledAt = t;
let shouldRefresh = this.changedAt === undefined;
for (const parent of this.parents) {
if (isBehavior(parent)) {
if (parent.state !== State.Push && parent.pulledAt !== t) {
parent.pull(t);
}
shouldRefresh = shouldRefresh || parent.changedAt > this.changedAt;
if (!isBehavior(parent)) {
continue;
}
parent.pull(t);
shouldRefresh = shouldRefresh || this.changedAt < parent.changedAt;
}
if (shouldRefresh) {
refresh(this, t);
Expand Down Expand Up @@ -128,39 +126,44 @@ export function pushToChildren(t: number, b: Behavior<any>): void {
}
}

function refresh<A>(b: Behavior<A>, t: number): void {
function refresh<A>(b: Behavior<A>, t: number) {
const newValue = b.update(t);
if (newValue !== b.last) {
b.changedAt = t;
b.last = newValue;
if (newValue === b.last) {
return;
}
b.changedAt = t;
b.last = newValue;
}

export function isBehavior(b: any): b is Behavior<any> {
return typeof b === "object" && "at" in b;
return (
(typeof b === "object" && "at" in b && !isPlaceholder(b)) ||
(isPlaceholder(b) && (b.source === undefined || isBehavior(b.source)))
);
}

export abstract class ProducerBehavior<A> extends Behavior<A> {
newValue(a: A): void {
const changed = a !== this.last;
if (changed) {
const t = tick();
this.last = a;
this.changedAt = t;
if (this.state === State.Push) {
this.pulledAt = t;
pushToChildren(t, this);
}
if (a === this.last) {
return;
}
const t = tick();
this.last = a;
this.changedAt = t;
if (this.state === State.Push) {
this.pulledAt = t;
pushToChildren(t, this);
}
}
pull(t: number): void {
this.last = this.update(t);
pull(t: number) {
refresh(this, t);
}
activate(): void {
activate(t: Time): void {
if (this.state === State.Inactive) {
this.activateProducer();
}
this.state = State.Push;
this.changedAt = t;
}
deactivate(): void {
this.state = State.Inactive;
Expand Down Expand Up @@ -284,17 +287,6 @@ class FlatMapBehavior<A, B> extends Behavior<B> {
super();
this.parents = cons(this.outer);
}
pushB(t: number): void {
const newValue = this.update(t);
this.pulledAt = t;
if (this.last !== newValue) {
this.changedAt = t;
this.last = newValue;
if (this.state === State.Push) {
pushToChildren(t, this);
}
}
}
update(t: number): B {
const outerChanged = this.outer.changedAt > this.changedAt;
if (outerChanged || this.changedAt === undefined) {
Expand Down Expand Up @@ -337,38 +329,34 @@ export function when(b: Behavior<boolean>): Now<Future<{}>> {
}

class SnapshotBehavior<A> extends Behavior<Future<A>> implements SListener<A> {
private afterFuture: boolean;
private node: Node<this> = new Node(this);
constructor(private parent: Behavior<A>, future: Future<any>) {
constructor(private parent: Behavior<A>, private future: Future<any>) {
super();
if (future.state === State.Done) {
// Future has occurred at some point in the past
this.afterFuture = true;
this.state = parent.state;
this.parents = cons(parent);
this.last = Future.of(at(parent));
} else {
this.afterFuture = false;
this.state = State.Push;
this.parents = nil;
this.last = F.sinkFuture<A>();
future.addListener(this.node, tick());
}
}
pushS(t: number, val: A): void {
if (this.afterFuture === false) {
// The push is coming from the Future, it has just occurred.
this.afterFuture = true;
this.last.resolve(at(this.parent));
this.parent.addListener(this.node, t);
this.last.resolve(this.parent.at(t), t);
this.parents = cons(this.parent);
this.changeStateDown(this.state);
this.parent.addListener(this.node, t);
}
update(t: Time): Future<A> {
if (this.future.state === State.Done) {
return Future.of(this.parent.at(t));
} else {
// We are receiving an update from `parent` after `future` has
// occurred.
this.last = Future.of(val);
return this.last;
}
}
update(_t: Time): Future<A> {
return this.last;
}
}

export function snapshotAt<A>(
Expand Down Expand Up @@ -402,12 +390,14 @@ export class FunctionBehavior<A> extends ActiveBehavior<A> {
constructor(private f: (t: Time) => A) {
super();
this.state = State.Pull;
this.parents = nil;
}
pull(t: Time): void {
if (this.pulledAt !== t) {
refresh(this, t);
this.pulledAt = t;
pull(t: Time) {
if (this.pulledAt === t) {
return;
}
this.pulledAt = t;
refresh(this, t);
}
update(t: Time): A {
return this.f(t);
Expand Down Expand Up @@ -435,7 +425,7 @@ class SwitcherBehavior<A> extends ActiveBehavior<A>
this.last = b.last;
next.addListener(this.nNode, t);
}
update(_t: number): A {
update(t: Time): A {
return this.b.last;
}
pushS(t: number, value: Behavior<A>): void {
Expand All @@ -446,11 +436,11 @@ class SwitcherBehavior<A> extends ActiveBehavior<A>
this.b = newB;
this.parents = cons(newB);
newB.addListener(this.bNode, t);
const newState = newB.state;
if (newState !== this.state) {
this.changeStateDown(newState);
this.changeStateDown(newB.state);
refresh(this, t);
if (this.changedAt === t && this.state === State.Push) {
pushToChildren(t, this);
}
this.pushB(t);
}
}

Expand Down Expand Up @@ -532,7 +522,7 @@ class ActiveAccumBehavior<A, B> extends ActiveBehavior<B>
pushToChildren(t, this);
}
}
pull(_t: number): void {}
pull(_t: number) {}
update(_t: number): B {
throw new Error("Update should never be called.");
}
Expand All @@ -553,7 +543,7 @@ export class AccumBehavior<A, B> extends ActiveBehavior<Behavior<B>> {
update(t: number): Behavior<B> {
return new ActiveAccumBehavior(this.f, this.initial, this.source, t);
}
pull(t: Time): void {
pull(t: Time) {
this.last = this.update(t);
this.changedAt = t;
this.pulledAt = t;
Expand Down Expand Up @@ -685,7 +675,7 @@ class MomentBehavior<A> extends Behavior<A> {
}
}
}
pull(t: number): void {
pull(t: number) {
this.pulledAt = t;
refresh(this, t);
}
Expand Down
8 changes: 6 additions & 2 deletions src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ export class PushOnlyObserver<A> implements BListener, SListener<A> {
}
}
pushB(t: number): void {
this.callback((this.source as any).last);
const b = <Behavior<A>>this.source;
b.pull(t);
this.callback(b.last);
}
pushS(t: number, value: A): void {
this.callback(value);
Expand Down Expand Up @@ -145,7 +147,9 @@ export class CbObserver<A> implements BListener, SListener<A> {
}
}
pushB(t: number): void {
this.callback((this.source as any).last);
const b = <Behavior<A>>this.source;
b.pull(t);
this.callback(b.last);
}
pushS(t: number, value: A): void {
this.callback(value);
Expand Down
37 changes: 23 additions & 14 deletions src/placeholder.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Reactive, State, SListener, BListener } from "./common";
import { Reactive, State, SListener, BListener, Time } from "./common";
import { Behavior, isBehavior, MapBehavior, pushToChildren } from "./behavior";
import { Node, cons } from "./datastructures";
import { Stream, MapToStream } from "./stream";
Expand All @@ -17,12 +17,13 @@ export class Placeholder<A> extends Behavior<A> {
source: Reactive<A, SListener<A> | SListener<A> | BListener>;
private node: Node<this> = new Node(this);
replaceWith(
parent: Reactive<A, SListener<A> | SListener<A> | BListener>
parent: Reactive<A, SListener<A> | SListener<A> | BListener>,
t?: number
): void {
this.source = parent;
this.parents = cons(parent);
if (this.children.head !== undefined) {
const t = tick();
t = t !== undefined ? t : tick();
this.activate(t);
if (isBehavior(parent) && this.state === State.Push) {
pushToChildren(t, this);
Expand All @@ -34,18 +35,14 @@ export class Placeholder<A> extends Behavior<A> {
(<any>child).pushS(t, a);
}
}
pull(t: number): void {
pull(t: number) {
if (this.source === undefined) {
throw new SamplePlaceholderError(this);
} else if (isBehavior(this.source)) {
this.source.pull(t);
this.pulledAt = t;
if (this.source.pulledAt !== t) {
this.source.pull(t);
}
if (this.last !== this.source.last) {
this.changedAt = t;
this.last = this.source.last;
}
this.changedAt = t;
this.last = this.source.last;
} else {
throw new Error("Unsupported pulling on placeholder");
}
Expand All @@ -57,6 +54,7 @@ export class Placeholder<A> extends Behavior<A> {
if (this.source !== undefined) {
this.source.addListener(this.node, t);
if (isBehavior(this.source)) {
this.pull(t);
this.last = this.source.last;
this.changedAt = this.source.changedAt;
this.pulledAt = this.source.pulledAt;
Expand All @@ -78,6 +76,10 @@ export class Placeholder<A> extends Behavior<A> {
}
}

export function isPlaceholder(p): p is Placeholder<any> {
return typeof p === "object" && "replaceWith" in p;
}

class MapPlaceholder<A, B> extends MapBehavior<A, B> {
pushS(t: number, a: A): void {
// @ts-ignore
Expand All @@ -86,11 +88,18 @@ class MapPlaceholder<A, B> extends MapBehavior<A, B> {
}

class MapToPlaceholder<A, B> extends MapToStream<A, B> {
last: B;
update(): B {
changedAt;
constructor(parent, public last: B) {
super(parent, last);
}
update(_t): B {
return (<any>this).b;
}
pull(): void {}
pull(t) {
if (this.changedAt === undefined) {
this.changedAt = t;
}
}
}

function install(target: Function, source: Function): void {
Expand Down
Loading

0 comments on commit 3a3cfd0

Please sign in to comment.