Most Commonly Used RxJS Operators
-
Cold Observable vs Hot Observable
- cold: of, from
- hot: subject, publish, connect, refCount, publicReplay
-
Four Subjects
- Subject
- BehaviorSubject
- AsyncSubject
- ReplaySubject
Replays or emit old values to new subscribers
const sub = new ReplaySubject(3); // "3" means emits 3 old value to new subscribers sub.next(1); sub.next(2); sub.subscribe(console.log); // output => 1,2 sub.next(3); sub.next(4); sub.subscribe(console.log); // output => 2,3,4
-
Maps
- map
- SwitchMap
- It is a flattening operator. It maps an Observable to a new Observable and emit values.
- It can cancel in-flight network requests
- It allows only one active inner subscription
- It only map the most recent value at a time to reduce unnecessary output( and network traffic).
// someComponent.ts ngOnInit() { this.validSearch$ = this.onSearchUser$ .pipe( debounceTime(1000), map(event => (
event.target).value), distinctUntilChanged(), filter(input => input !== ""), switchMap(data => this.rxjsSearchableInputService.searchUser(data)) ) } - differences between SwitchMap,mergeMap,ConcatMap,exhaustMap
- SwitchMap
It maps the most recent value to a new Observable.
output of the following code: 0 0 1 2 3 4
- MergeMap( FlatMap )
If every request needs to complete, it is the correct options.It assure every observable to emit values and complete.
output of the following code: 01021 03201 43120 42313 42434
- ConcatMap
use it if the order of emission and subscription of inner observables is important
output of the following code: 01234 01234 01234 01234 01234
- exhaustMap
Map to inner observable, ignore other values until that observable completes.
output of the following code: 01234
- SwitchMap
const sourceInterval = interval(1000); const delayedInterval = sourceInterval.pipe(delay(10), take(4)); const exhaustSub = merge( delayedInterval, of(true) ) .pipe(PLace_Name_Of_Map_Here((_) => sourceInterval.pipe(take(5)))) .subscribe((val) => console.log(val));
map everyting to a constant value
const source = interval(2000); const example = source.pipe(mapTo(‘Hello World’)); const subscribe = example.subscribe(val => console.log(val)) ; // output: Hello World
turn multiple observables into a single observable
const first = interval(2500) ; const second= interval(2500); const example = merge( first.pipe( mapTo(‘first’), second.pipe( mapTo(‘second’) ); // output: second, first const subscribe = example.subscribe(example); ..
share and replay values for later subscribers
const route = new Subject<string>(); const lastUrl = route.pipe(shareReplay(1)); // requires initial subscription const initialSubscriber = lastUrl.subscribe(console.log); route.next('my path'); const lastSubscriber = lastUrl.subscribe(console.log); // output => my path
An example about how to use RxJS operators
- main features:
- every 10 seconds
- get 30 new users from github server
- show a notification on the screen
- if user clicks "refresh" button, refresh user list ; if not, go to next cycle
- source code
// notification.service.ts import { HttpClient } from '@angular/common/http'; import { Injectable } from '@angular/core'; import { Observable, of, timer } from 'rxjs'; import { catchError, map, share, switchMap } from 'rxjs/operators'; import { User } from '../rxjs-cache/user'; import { shareReplay } from 'rxjs/operators'; const CACHE_SIZE = 1; const REFRESH_INTERVAL = 10000; // every 10 seconds const API_ENDPOINT = 'https://api.github.com/users?since='; @Injectable({ providedIn: 'root', }) export class NotificationService { private cacheUsers$?: Observable<User[]>; private userStartId = 0; constructor(private http: HttpClient) {} get users() { if (!this.cacheUsers$) { const timer$ = timer(0, REFRESH_INTERVAL); // at 0 second, emit the first data; at every REFRESH_INTERVAL second , emit the following data this.cacheUsers$ = timer$.pipe( switchMap(() => this.requestUsers()), shareReplay(CACHE_SIZE) ); } return this.cacheUsers$; } private requestUsers() { this.userStartId = this.userStartId + 30; return this.http.get
(API_ENDPOINT + this.userStartId).pipe( map((response) => response), catchError((error) => { console.log('something went wrong' + error); return of([]); }) ); } } // notification.component.ts import { Component, OnInit } from '@angular/core'; import { NotificationService } from './notification.service'; import { merge, Observable, Subject } from 'rxjs'; import { User } from '../rxjs-cache/user'; import { mapTo, mergeMap, skip, take } from 'rxjs/operators'; import { ThisReceiver } from '@angular/compiler'; @Component({ selector: 'app-notification', templateUrl: './notification.component.html', styleUrls: ['./notification.component.css'], }) export class NotificationComponent implements OnInit { users$!: Observable<User[]>; updateClick$ = new Subject<void>(); showNotification$?: Observable<boolean>; constructor(private notService: NotificationService) {} ngOnInit(): void { const initialUsers$ = this.getUserOnce(); const updateUsers$ = this.updateClick$.pipe( mergeMap(() => this.getUserOnce()) ); this.users$ = merge(initialUsers$, updateUsers$); const initNotification$ = this.getNotifications(); const show$ = initNotification$.pipe(mapTo(true)); const hide$ = this.updateClick$.pipe(mapTo(false)); this.showNotification$ = merge(show$, hide$); // output => true,false; if click "refresh" button, execute updateClick$.next() } getUserOnce() { return this.notService.users.pipe(take(1)); } getNotifications() { return this.notService.users.pipe(skip(1)); } }
// notification.component.html <div class="container" style="margin-top: 30px; width: 40%"> <div class="row justify-content-md-center" *ngIf="showNotification$ | async"> <div> <strong>Warning!</strong>new user infor avaiable, please click to update <button type="button" style="margin-left: 20px" class="btn btn-warning" (click)="updateClick$.next()" > Update </button> </div> </div> <div class="row justify-content-md-center"> <div style="margin: 10px" class="card w-100" *ngFor="let user of users$ | async" > <div class="card-body"> <h5 class="card-title">User Name:{{ user.login }}</h5> <p class="card-text"><strong>Github URL:</strong> {{ user.url }}</p> </div> </div> </div> </div>
- source code is hosted on github RxJS notification