Vova Bilyachat

Melbourne, Australia

RxJS - Why its so amazing or how to write own operator

13 December 2016

Introduction

When I started with observable I did not get why RxJs is so popular since I thought they are implementing observer pattern which was long time ago. Also I did not understand why people always call them stream or flow, so this is how my journey started with RxJs.
It’s a huge topic about what is included and how it works.  I will generally concentrate on operators this time since this on of features makes RxJs so amazing. http://reactivex.io/documentation/observable.html

Other interesting topic for me is Subjects http://xgrommx.github.io/rx-book/content/subjects/index.html,

1. private _eventFlowStopper = new BehaviorSubject<boolean>(false);
2. private _eventsObservable: Subject<EventFlow> = new Subject<EventFlow>();

BehaviorSubject always require initial value and could be used for cases such as settings, when application on initialization gets some default value. In addition difference is that when we subscribe to Subject, subscribe event will be emited only if there is something new broadcasted after subscription, meanwhile BehaviorSubject will send data immediately after subscription. So as soon as any part of app will subscribe to eventFlowStopper  it means that it will get latest value in this case it will be false since noting was send to eventFlowStopper

Operators  

It does not make sense to copy/paste info to here from official doc http://reactivex.io/documentation/operators.html  so read it :)
But I would like to mentioned why this  part is worse to study in RxJs
- Chaining - you can chain observable which gives you different way to make your own flow
- Many built in operators - If you want to delay event, debounce, remap its already there you can use it straight away
- You can write your own - Yes if you have special case you can write your own operator

How it works

Observable object have method lift (https://github.com/ReactiveX/rxjs/blob/master/src/Observable.ts#L67) which create new observable so it means that if we apply filter (http://xgrommx.github.io/rx-book/content/observable/observable_instance_methods/filter.html) then we will end up with two Observable streams.

Let's play: Requirements

Reading is good but practice is always much better actually when I wrote this simple app I discovered lots of small details :)
So let's assume we have Application where event flow throw channel
- If event of type "First" is send it means that channel should be paused for some time
- If event of type "Second" is send it means that from now and till this event is disabled, only type Second is allowed and type First goes to buffer;
This pretty simple what I am doing, in my app I have observable which is receiving events from SignalR. And that operator will APPLY logic to stream such as delay or allow specific type.

Operator

Full source code for operator you can see at (https://github.com/vovikdrg/rxjs_operators/blob/master/src/app/event.flow.operator.ts).
Since I have done my functionality I will go reverse way as I did while implementing.
I like the possibility to extend of course I am lazy and I expect framework to have all I need but if in some cases it is not implemented I don’t want to look for other lib I want to extend what I am using.

Firstly lets create method definition who will create new observable with EventFlowOperator.

export function controlledEventFlow(this: Observable<EventFlow>, stopNotifier: Observable<boolean>, allowTypeOnly: Observable<string>): Observable<EventFlow> {
    return this.lift(new EventFlowOperator(stopNotifier, allowTypeOnly));
}
- this: Observable<EventFlow> - this will limit usage to observable of type Event flow
- stopNotifier: Observable<boolean> - this will notify subscriber that no events are allowed to send
- allowTypeOnly: Observable<string> - notify subscriber that from now only this type is allowed to send



Now to allow typescript to compile we need to extend observable

declare module 'rxjs/Observable' {
    interface Observable<T> {
        controlledEventFlow: typeof controlledEventFlow;
    }
}

And prototype created function

Observable.prototype.controlledEventFlow = controlledEventFlow;

Then lets create our operator

class EventFlowOperator implements Operator<EventFlow, EventFlow> {
    constructor(private stopNotifier: Observable<boolean>, private allowTypeOnly: Observable<string>) {
    }

    call(subscriber: Subscriber<EventFlow>, source: any): any {
        return source._subscribe(new EventFlowSubscriber(subscriber, this.stopNotifier, this.allowTypeOnly));
    }
}

NOTE: call method will be called so many times as we subscribe to this observable it means that EventFlowSubscriber will be created for each subscription.

EventFlowSubscriber
- Constructor - here we subscribe to notifier members to be able to lock and set filters
○ Both notifers have debounce operator to make sure that only latest is delivered.
- _next - overriding base method to push data to buffer and call method to send data
- deliverNext - this method will send data to subscribers

Components

Components are responsible to handle specific event type and pause or enable filtering of specific types.
When I started my app I have used setTimeout for delay but later I have discovered that observable actually has TimerObservable.create which allows to create timeouts and intervals.

 this._timer = TimerObservable.create(2000);
this._eventService
      .events
      .filter(e => e.type == 'First')
      .subscribe(e => {
        console.log(e);
        this.events.push(e);
        this._eventService.eventFlowStopper.next(true);
        var subscription = this._timer.subscribe((n) => {
          subscription.unsubscribe();
          this._eventService.eventFlowStopper.next(false);
        })
      });

 TimerObservable is cold observable which means that nothing is broadcasted until it gets subscriber.

Event Service 

Case


  private _eventsObservable: Observable<EventFlow>

  constructor() {
    this._eventsObservable = this.PushEvents
      .controlledEventFlow(this._eventFlowStopper, this._allowType);  
  }

Startup

When app is runned two components are subscribed to eventsObservable via public get events(): Observable<EventFlow> and as I mentioned before two EventFlowSubscriber will be created because of   return source._subscribe(new EventFlowSubscriber(subscriber, this.stopNotifier, this.allowTypeOnly));
Events
PushEvents.next(new EventFlow("First", new Date()));
PushEvents.next(new EventFlow("First", new Date()));
PushEvents.next(new EventFlow("Second", new Date()));

Flow

1. EventFlowSubscriber._next pushed event to buffer and called deliverNext
2. EventFlowSubscriber.deliverNext will deliver event to DisplayFirstTypeComponent because all if statements are false.
3. After DisplayFirstTypeComponent received data it sends eventFlowStopper.next(true); this means that now value is true
4. And here is trick since we have 2 subscribers then EventFlowSubscriber._next is called for DisplaySecondTypeComponent and now this.flowStopper is true and it means that event is not delivered to component + state of both buffers is different in first subscriber is now -1 item and second will stay the same.
5. After 2 seconds DisplayFirstTypeComponent will emit false to stopNotifier and First subscriber will deliver next message to First component components, which will switch back stopNotifier to true, but here is one more important thing is that Second subscriber will receive stopNotifier as true and then immediately  false. Look at time 54.450.

Solution

Well here is pretty easy solution in the service I can create one more observable where we will put data from controlledEventFlow subscription.
Change code in service to

  private _eventFlowStopper = new BehaviorSubject<boolean>(false);
  private _allowType = new BehaviorSubject<string>("");
  private _eventsObservable: Subject<EventFlow> = new Subject<EventFlow>();

  constructor() {
    this.PushEvents
      .controlledEventFlow(this._eventFlowStopper, this._allowType)  
      .subscribe(ev=> this._eventsObservable.next(ev) );
  }

And now I get behaviour I need


Source code