Most Commonly Used RxJS Operators

  • Cold Observable vs Hot Observable
    • cold:  of, from
    • hot:  subject, publish, connect, refCount, publicReplay
  • Four Subjects
    • Subject
    • BehaviorSubject
    • AsyncSubject
    • ReplaySubject
      Replays or emit old values to new subscribers
      const sub = new ReplaySubject(3); // "3" means emits 3 old value to new subscribers
      sub.next(1);
      sub.next(2);
      sub.subscribe(console.log); // output => 1,2
      sub.next(3);
      sub.next(4);
      sub.subscribe(console.log); // output => 2,3,4
                            
  • Maps
    1. map
    2. SwitchMap
      • It is a flattening operator. It maps an Observable to a new Observable and emit values.
      • It can cancel in-flight network requests
      • It allows only one active inner subscription
      • It only map the most recent value at a time to reduce unnecessary output( and network traffic).
        // someComponent.ts
        ngOnInit() {
              this.validSearch$ = this.onSearchUser$
                  .pipe(
                      debounceTime(1000),
                      map(event => (event.target).value),
                      distinctUntilChanged(),
                      filter(input => input !== ""),
                      switchMap(data => this.rxjsSearchableInputService.searchUser(data))
                  )
      }
                          
    3. differences between SwitchMap,mergeMap,ConcatMap,exhaustMap
      • SwitchMap

        It maps the most recent value to a new Observable.

        output of the following code: 0 0 1 2 3 4

      • MergeMap( FlatMap )

        If every request needs to complete, it is the correct options.It assure every observable to emit values and complete.

        output of the following code: 01021 03201 43120 42313 42434

      • ConcatMap

        use it if the order of emission and subscription of inner observables is important

        output of the following code: 01234 01234 01234 01234 01234

      • exhaustMap

        Map to inner observable, ignore other values until that observable completes.

        output of the following code: 01234

      •   const sourceInterval = interval(1000);
          const delayedInterval = sourceInterval.pipe(delay(10), take(4));
          const exhaustSub = merge(
            delayedInterval, of(true)
            )
            .pipe(PLace_Name_Of_Map_Here((_) => sourceInterval.pipe(take(5))))
            .subscribe((val) => console.log(val));
                              
    4. mapTo

      map everyting to a constant value

      const source = interval(2000);
      const example = source.pipe(mapTo(‘Hello World’));
      const subscribe = example.subscribe(val => console.log(val)) ;
      // output: Hello World
                          
  • merge

    turn multiple observables into a single observable

    const first = interval(2500) ;
    const second= interval(2500);
    const example = merge(
    first.pipe( mapTo(‘first’),
    second.pipe( mapTo(‘second’)
    );
    // output:  second, first
    const subscribe = example.subscribe(example); ..
                    
  • skip, timer, interval
  • shareReply
    share and replay values for later subscribers
    const route = new Subject<string>();
    const lastUrl = route.pipe(shareReplay(1));
    
    // requires initial subscription
    const initialSubscriber = lastUrl.subscribe(console.log);
    route.next('my path');
    
    const lastSubscriber = lastUrl.subscribe(console.log); // output => my path
                    
  • debounce
  • fromEvent

An example about how to use RxJS operators

  • main features:
    1. every 10 seconds
    2. get 30 new users from github server
    3. show a notification on the screen
    4. if user clicks "refresh" button, refresh user list ; if not, go to next cycle
  • source code
    // notification.service.ts
    
    import { HttpClient } from '@angular/common/http';
    import { Injectable } from '@angular/core';
    import { Observable, of, timer } from 'rxjs';
    import { catchError, map, share, switchMap } from 'rxjs/operators';
    import { User } from '../rxjs-cache/user';
    import { shareReplay } from 'rxjs/operators';
    
    const CACHE_SIZE = 1;
    const REFRESH_INTERVAL = 10000; // every 10 seconds
    const API_ENDPOINT = 'https://api.github.com/users?since=';
    
    @Injectable({
      providedIn: 'root',
    })
    export class NotificationService {
      private cacheUsers$?: Observable<User[]>;
      private userStartId = 0;
    
      constructor(private http: HttpClient) {}
    
      get users() {
        if (!this.cacheUsers$) {
          const timer$ = timer(0, REFRESH_INTERVAL); // at 0 second, emit the first  data; at every REFRESH_INTERVAL second , emit the following data
          this.cacheUsers$ = timer$.pipe(
            switchMap(() => this.requestUsers()),
            shareReplay(CACHE_SIZE)
          );
        }
        return this.cacheUsers$;
      }
    
      private requestUsers() {
        this.userStartId = this.userStartId + 30;
        return this.http.get(API_ENDPOINT + this.userStartId).pipe(
          map((response) => response),
          catchError((error) => {
            console.log('something went wrong' + error);
            return of([]);
          })
        );
      }
    }
    
                    
    // notification.component.ts
    import { Component, OnInit } from '@angular/core';
    import { NotificationService } from './notification.service';
    import { merge, Observable, Subject } from 'rxjs';
    import { User } from '../rxjs-cache/user';
    import { mapTo, mergeMap, skip, take } from 'rxjs/operators';
    import { ThisReceiver } from '@angular/compiler';
    
    @Component({
      selector: 'app-notification',
      templateUrl: './notification.component.html',
      styleUrls: ['./notification.component.css'],
    })
    export class NotificationComponent implements OnInit {
      users$!: Observable<User[]>;
      updateClick$ = new Subject<void>();
      showNotification$?: Observable<boolean>;
    
      constructor(private notService: NotificationService) {}
    
      ngOnInit(): void {
        const initialUsers$ = this.getUserOnce();
        const updateUsers$ = this.updateClick$.pipe(
          mergeMap(() => this.getUserOnce())
        );
        this.users$ = merge(initialUsers$, updateUsers$);
    
        const initNotification$ = this.getNotifications();
        const show$ = initNotification$.pipe(mapTo(true));
        const hide$ = this.updateClick$.pipe(mapTo(false));
        this.showNotification$ = merge(show$, hide$); // output => true,false; if click "refresh" button, execute updateClick$.next()
      }
    
      getUserOnce() {
        return this.notService.users.pipe(take(1));
      }
    
      getNotifications() {
        return this.notService.users.pipe(skip(1));
      }
    }
                  
    // notification.component.html
    
    <div class="container" style="margin-top: 30px; width: 40%">
      <div class="row justify-content-md-center" *ngIf="showNotification$ | async">
        <div>
          <strong>Warning!</strong>new user infor avaiable, please click to update
          <button
            type="button"
            style="margin-left: 20px"
            class="btn btn-warning"
            (click)="updateClick$.next()"
          >
            Update
          </button>
        </div>
      </div>
    
      <div class="row justify-content-md-center">
        <div
          style="margin: 10px"
          class="card w-100"
          *ngFor="let user of users$ | async"
        >
          <div class="card-body">
            <h5 class="card-title">User Name:{{ user.login }}</h5>
            <p class="card-text"><strong>Github URL:</strong> {{ user.url }}</p>
          </div>
        </div>
      </div>
    </div>
    
                  
  • source code is hosted on github RxJS notification