Introduction
When I started with observable I did not get why RxJs is so popular
since I thought they are implementing observer pattern which was long time ago. Also I did not understand why
people always call them stream or flow, so this is how my journey started with RxJs.
It’s a huge topic about
what is included and how it works. I will generally concentrate on operators this time since this on of
features makes RxJs so amazing. http://reactivex.io/documentation/observable.html
Other interesting
topic for me is Subjects http://xgrommx.github.io/rx-book/content/subjects/index.html,
1. private _eventFlowStopper = new BehaviorSubject<boolean>(false);
2. private _eventsObservable: Subject<EventFlow>
= new Subject<EventFlow>();
BehaviorSubject always require initial value and could be used for
cases such as settings, when application on initialization gets some default value. In addition difference is that
when we subscribe to Subject, subscribe event will be emited only if there is something new broadcasted after
subscription, meanwhile BehaviorSubject will send data immediately after subscription. So as soon as any part of
app will subscribe to eventFlowStopper it means that it will get latest value in this case it will be false
since noting was send to eventFlowStopper
Operators
It does not make sense to copy/paste info to here from official
doc http://reactivex.io/documentation/operators.html so read it :)
But I would like to mentioned why
this part is worse to study in RxJs
-
Chaining - you can chain observable which gives you different way to make your own flow
- Many built in operators - If you want to delay event, debounce, remap its
already there you can use it straight away
-
You can write your own - Yes if you have special case you can write your own operator
How it works
Observable object have method lift
(https://github.com/ReactiveX/rxjs/blob/master/src/Observable.ts#L67) which create new observable so it means that
if we apply filter (http://xgrommx.github.io/rx-book/content/observable/observable_instance_methods/filter.html)
then we will end up with two Observable streams.
Let's play: Requirements
Reading is good but practice is always much better
actually when I wrote this simple app I discovered lots of small details :)
So let's assume we have
Application where event flow throw channel
- If
event of type "First" is send it means that channel should be paused for some time
- If event of type "Second" is send it means that from now and till this
event is disabled, only type Second is allowed and type First goes to buffer;
This pretty simple what I am
doing, in my app I have observable which is receiving events from SignalR. And that operator will APPLY logic to
stream such as delay or allow specific type.
Operator
Full source code for operator you can see at
(https://github.com/vovikdrg/rxjs_operators/blob/master/src/app/event.flow.operator.ts).
Since I have done my
functionality I will go reverse way as I did while implementing.
I like the possibility to extend of course I
am lazy and I expect framework to have all I need but if in some cases it is not implemented I don’t want to look
for other lib I want to extend what I am using.
Firstly lets create method definition who will create
new observable with EventFlowOperator.
export function controlledEventFlow(this:
Observable<EventFlow>, stopNotifier: Observable<boolean>, allowTypeOnly: Observable<string>):
Observable<EventFlow> {
return this.lift(new EventFlowOperator(stopNotifier,
allowTypeOnly));
}
- this:
Observable<EventFlow> - this will limit usage to observable of type Event flow
- stopNotifier: Observable<boolean> - this will notify subscriber that
no events are allowed to send
- allowTypeOnly:
Observable<string> - notify subscriber that from now only this type is allowed to send
Now
to allow typescript to compile we need to extend observable
declare module 'rxjs/Observable' {
interface Observable<T> {
controlledEventFlow: typeof
controlledEventFlow;
}
}
And prototype created function
Observable.prototype.controlledEventFlow
= controlledEventFlow;
Then lets create our operator
class EventFlowOperator implements
Operator<EventFlow, EventFlow> {
constructor(private stopNotifier:
Observable<boolean>, private allowTypeOnly: Observable<string>) {
}
call(subscriber: Subscriber<EventFlow>, source: any): any {
return
source._subscribe(new EventFlowSubscriber(subscriber, this.stopNotifier, this.allowTypeOnly));
}
}
NOTE:
call method will be called so many times as we subscribe to this observable it means that EventFlowSubscriber will
be created for each subscription.
EventFlowSubscriber
- Constructor - here we subscribe to notifier members to be able to lock and set filters
○ Both notifers have debounce operator to make sure that only latest is
delivered.
- _next - overriding base method to
push data to buffer and call method to send data
- deliverNext - this method will send data to subscribers
Components
Components are responsible to handle specific event type and pause or
enable filtering of specific types.
When I started my app I have used setTimeout for delay but later I have
discovered that observable actually has TimerObservable.create which allows to create timeouts and intervals.
this._timer
= TimerObservable.create(2000);
this._eventService
.events
.filter(e => e.type == 'First')
.subscribe(e => {
console.log(e);
this.events.push(e);
this._eventService.eventFlowStopper.next(true);
var subscription =
this._timer.subscribe((n) => {
subscription.unsubscribe();
this._eventService.eventFlowStopper.next(false);
})
});
TimerObservable is cold observable which means that nothing is broadcasted until
it gets subscriber.
Event Service
Case
private _eventsObservable: Observable<EventFlow>
constructor() {
this._eventsObservable = this.PushEvents
.controlledEventFlow(this._eventFlowStopper, this._allowType);
}
Startup
When app is runned two components are subscribed to eventsObservable via
public get events(): Observable<EventFlow> and as I mentioned before two EventFlowSubscriber will be created
because of return source._subscribe(new EventFlowSubscriber(subscriber, this.stopNotifier,
this.allowTypeOnly));
Events
PushEvents.next(new EventFlow("First", new Date()));
PushEvents.next(new
EventFlow("First", new Date()));
PushEvents.next(new EventFlow("Second", new Date()));
Flow
1.
EventFlowSubscriber._next pushed event to buffer and called deliverNext
2. EventFlowSubscriber.deliverNext will deliver event to DisplayFirstTypeComponent because all if statements
are false.
3. After DisplayFirstTypeComponent
received data it sends eventFlowStopper.next(true); this means that now value is true
4. And here is trick since we have 2 subscribers then
EventFlowSubscriber._next is called for DisplaySecondTypeComponent and now this.flowStopper is true and it means
that event is not delivered to component + state of both buffers is different in first subscriber is now -1 item
and second will stay the same.
5. After 2
seconds DisplayFirstTypeComponent will emit false to stopNotifier and First subscriber will deliver next message to
First component components, which will switch back stopNotifier to true, but here is one more important thing is
that Second subscriber will receive stopNotifier as true and then immediately false. Look at time 54.450.
Solution
Well here is pretty easy solution in the service I can create one more
observable where we will put data from controlledEventFlow subscription.
Change code in service to
private _eventFlowStopper = new BehaviorSubject<boolean>(false);
private _allowType = new
BehaviorSubject<string>("");
private _eventsObservable: Subject<EventFlow> = new
Subject<EventFlow>();
constructor() {
this.PushEvents
.controlledEventFlow(this._eventFlowStopper, this._allowType)
.subscribe(ev=> this._eventsObservable.next(ev) );
}
And now I get behaviour I need
Source
code