Aller au contenu

Tout ce que je sais sur RxJS

L’autre jour un membre de ma communautĂ© Discord a posĂ© une question trĂšs intĂ©ressante

pourquoi et quand utiliser un behaviorSubject au lieu d'un Observable?

Je lui ai Ă©crit une rĂ©ponse qui m’a pris 45min Ă  rĂ©diger tant cette question implique beaucoup de choses. Et ça m’a donnĂ© l’envie de me lancer dans un article sur un sujet que j’aime : RxJS. Et la question Observable vs Subject est un bel angle par lequel Ă©tudier tout cela.

Alors faisons un grand plongeon dans RxJS pour essayer de démystifier de maniÚre la plus simple possible tous les rouages de cette librairie.


RxJS, ça devrait vous ĂȘtre familier. C’est une librairie qui permet de gĂ©rer l’asynchronicitĂ© des Ă©vĂšnements et des donnĂ©es au sein d’une application Javascript, et RxJS est largement utilisĂ© dans Angular.

Et mĂȘme si les Signal vont probablement rĂ©duire l’utilisation de la librairie, ce n’est pas prĂ©vu de si tĂŽt et ça ne sera pas complĂštement le cas, alors faisons un point sur une question fondamentale qui va nous permettre de comprendre RxJS dans son intĂ©gralitĂ© : c’est quoi la diffĂ©rence entre un Observable et un Subject ?

Avant de parler de leurs différences, parlons de leurs ressemblances.

import { Observable, Subject } from "rxjs";

const obs = new Observable();
const sub = new Subject();

Ce qu’on peut voir, c’est que ce sont tous les deux des class que l’on peut instancier.

import { Observable, Subject } from "rxjs";

const obs = new Observable();
const sub = new Subject();

obs.subscribe((data) => console.log(`Obs ${data}`));
sub.subscribe((data) => console.log(`Sub ${data}`));

Mais l’intĂ©rĂȘt de ces deux class, c’est surtout qu’on peut s’abonner dessus avec la fonction subscribe. “S’abonner” veut dire qu’à l’instant oĂč une donnĂ©e va ĂȘtre poussĂ©e dans l’Observable ou le Subject alors ces derniers vont Ă©mettre cette donnĂ©e, ainsi chaque abonnĂ© va recevoir cette nouvelle donnĂ©e immĂ©diatement et ce qu’il y a dans le .subscribe() va se trigger.

On peut donc en conclure que les Observable et Subject permettent de crĂ©er des flux de donnĂ©es, ce flux est continu, il ne s’interrompt pas (sauf si on lui dit explicitement). Et pour ouvrir ce flux de donnĂ©es, il faut s’abonner en utilisant .subscribe().

Dans mon exemple je me suis abonnĂ© Ă  la fois Ă  obs et sub, ainsi dĂšs qu’une donnĂ©e va ĂȘtre poussĂ©e dans l’un ou l’autre alors le console.log va s’exĂ©cuter.

Mais pas avant !

Ce que je veux dire par lĂ , c’est que si j’exĂ©cute ce script, rien ne va s’afficher. Pourquoi ? Parce qu’à aucun moment je n’ai poussĂ© de nouvelle donnĂ©e. Mes deux abonnements sont faits, les flux sont ouverts, mais Ă©tant donnĂ© que je n’ai pas encore poussĂ© de donnĂ©e alors obs ou sub n’émettent rien, donc je ne rentre pas dans le callback du tout.

En rĂ©sumĂ©, voilĂ  comment le dĂ©roulĂ© de “vie” d’un Observable (ça marche pareil pour un Subject)

  1. Je crĂ©Ă© l’Observable avec const obs = new Observable();
  2. Dans divers endroits de mon application je m’abonne à mon Observable en faisant obs.subscribe(...)
  3. DĂšs qu’une donnĂ©e est poussĂ©e dans l’ Observable (on va voir comment juste aprĂšs) alors obs va l’émettre, en d’autres termes il va la pousser dans son flux
  4. Les abonnées vont recevoir cette donnée car ils écoutent le flux et pourront faire des trucs divers et variés avec cette donnée

.next() pour pousser des données

import { Subject } from "rxjs";

const sub = new Subject();

sub.subscribe((data) => console.log(`Sub ${data}`));

sub.next("Salut !");

Mesdames et messieurs, devant aux yeux Ă©bahis je viens de pousser une donnĂ©e dans sub ! 👏

Maintenant, que se passe t-il si j’exĂ©cute ce code ? Et bien vous l’aurez devinĂ©, je verrais Sub Salut ! qui provient du console.log("Sub ${data}"). Tout simplement parce que lorsque j’ai fait sub.next('Salut !), j’ai poussĂ© une nouvelle donnĂ©e dans le Subject et ce dernier l’a Ă©mise, ainsi tous les abonnĂ©s l’ont reçues.

Je dis bien “tous les abonnĂ©s” car je peux m’abonner plusieurs fois :

import { Subject } from "rxjs";

const sub = new Subject();

sub.subscribe((data) => console.log(`Sub1 ${data}`));
sub.subscribe((data) => console.log(`Sub2 ${data}`));
sub.subscribe((data) => console.log(`Sub3 ${data}`));

sub.next("Salut !");

Ainsi, tous les abonnĂ©s vont recevoir cette mĂȘme valeur. A noter que si j’avais mis le sub.next('Salut !'); avant les subscribe() alors les abonnĂ©s n’auraient reçu aucune donnĂ©e car la valeur aurait Ă©tĂ© Ă©mise avant un quelconque abonnement.

Utiliser les Subject dans Angular

Mais alors à quoi ça sert ? Comment tirer profit de cela ?

Et bien RxJS est trĂšs utile dans le monde JS pour gĂ©rer des Ă©vĂšnements de maniĂšres isolĂ©es. Par exemple si je veux que le clique d’un bouton mette Ă  jour un compteur et que la valeur de ce compteur soit affichĂ© dans plusieurs composants diffĂ©rents au sein de mon application, je peux utiliser un Subject. Ce Subject je vais le crĂ©er dans un fichier Ă  part et pas directement dans un composant car sa valeur n’appartient Ă  aucun composant en particulier. Et ce fichier, dans le cas de Angular, sera un service.

@Injectable({ providedIn: "root" })
export class CounterService {
  readonly count = new Subject<number>();

  constructor() {
    this.count.next(0); // j'initialise la valeur Ă  0
  }

  increment() {
    // this.count.next(???); que mettre ici ?
  }
}

Attendez une minute
 Comment on fait pour rĂ©cupĂ©rer la valeur actuelle de count pour l’incrĂ©menter ?

BehaviorSubject Ă  la rescousse !

import { BehaviorSubject } from "rxjs";

@Injectable({ providedIn: "root" })
export class CounterService {
  readonly count = new BehaviorSubject<number>(0);

  increment() {
    this.count.next(this.count.value + 1);
  }
}

BehaviorSubject c’est une autre class qui vient de RxJS et qui est trùs similaire à Subject, sauf que :

  • BehaviorSubject a une notion de “valeur courante”. Quand un BehaviorSubject Ă©met une donnĂ©e, il va Ă©galement l’enregistrer en son sein. Par consĂ©quent, on a accĂšs Ă  la propriĂ©tĂ© .value qui va nous renvoyer cette valeur
  • Quand on initialise un BehaviorSubject on doit lui passer une valeur initiale, c’est exactement ce que je fais en faisant readonly count = new BehaviorSubject<number>(0). Du coup, plus besoin de faire le this.count.next(0) dans le constructor !

PlutĂŽt pas mal non ? Je peux maintenant utiliser les valeurs Ă©mient par count en m’abonnant Ă  celui-ci directement dans mes composants et Ă©galement pousser une valeur incrĂ©mentĂ©e :

@Component({
  standalone: true,
  template: `
    <p>Counter: {{ count }}</p>
    <button (click)="counterService.increment()">+1</button>
  `,
})
export class SomeComponent implements OnInit {
  readonly countService = inject(CounterService);
  count!: number;

  ngOnInit() {
    this.countService.count.subscribe((value) => (this.count = value));
  }
}

Je peux implĂ©menter cette fonctionnalitĂ© partout oĂč je veux dans mon application, certains autres composants pourraient utiliser une fonction decrement, ou simplement afficher la valeur de count. J’ai une application oĂč la separation of concerns est respectĂ©e ! Le CounterService crĂ©Ă© est ce qu’on appelle un “store” : une source de vĂ©ritĂ© qui contient mes donnĂ©es, le moyen d’accĂ©der Ă  ces donnĂ©es par d’autres briques de mon application, et les mĂ©thodes qui modifient ces donnĂ©es.

En bref, utilisez BehaviorSubject lorsque vous voulez crĂ©Ă© un store manuellement sans librairie tiĂšrce ! C’est d’ailleurs ce qu’utilise NgRx sous la capot.

Mais
 Mon implĂ©mentation actuel est loin d’ĂȘtre parfaite ! AmĂ©liorons tout ça en introduisant une notion : les Observable.

import { Observable } from "rxjs";

@Injectable({ providedIn: "root" })
export class CounterService {
  private readonly count = new BehaviorSubject<number>(0);
  readonly count$ = this.count.asObservable();

  increment() {
    this.count.next(this.count.value + 1);
  }
}

Ici il y a deux différences :

  • J’ai crĂ©Ă© une propriĂ©tĂ© publique count$ qui est Ă©gale Ă  this.count.asObservable()
  • J’ai mis le BehaviorSubject en privĂ©

count$ est donc un Observable grĂące Ă  .asObservable(). Un Observable est un flux de donnĂ©es tout comme un Subject ce qui signifie qu’on peut faire un subscribe() dessus. Mais la grande diffĂ©rence est la suivante : sur un Observable on ne peut pas faire de .next().

Un Observable est comme un Subject mais read-only, on ne peut pas lui pousser des donnĂ©es. C’est donc trĂšs utile pour exposer aux autres parties de mon application des donnĂ©es sans leurs donner la possibilitĂ© de les modifier directement car cette responsabilitĂ© incombe au service oĂč est dĂ©clarĂ©e ma donnĂ©e.

Je peux donc changer le code de mon composant afin d’utiliser count$, j’en profite le rendre plus optimal en utilisant le pipe async qui a pour but subscribe automatiquement et de unsubscribe quand mon composant est dĂ©truit (par exemple quand je change de page) :

@Component({
  standalone: true,
  template: `
    <p>Counter: {{ countService.count$ | async }}</p>
    <button (click)="counterService.increment()">increment</button>
  `,
})
export class SomeComponent {
  readonly countService = inject(CounterService);
}

Comment fonctionne les Observable

Allons plus loin dans la dĂ©couverte des Observable en n’en crĂ©ant un.

import { Observable } from "rxjs";

const obs = new Observable();

Comme je le disais plus haut, cet obs ne fait rien, en rĂ©alitĂ© on ne crĂ©Ă© jamais d’Observable comme ça car un Observable attend une fonction en callback :

import { Observable } from "rxjs";

const obs = new Observable((subscriber) => {
  subscriber.next("Salut !");
});

obs.subscribe((data) => console.log(data)); // Salut !

C’est en rĂ©alitĂ© dans le callback d’un Observable que l’on doit faire notre .next(), et on le fait sur l’argument subscriber qui correspond en rĂ©alitĂ© au flux de donnĂ©es que chaque abonnĂ© Ă©coutera lorsqu’il fera .subscribe().

import { Observable } from "rxjs";

const obs = new Observable((subscriber) => {
  subscriber.next("Salut ");
  subscriber.next("comment ");
  subscriber.next("ça va ?");
});

obs.subscribe((data) => console.log(data));

Ici je vais bel et bien avoir 3 console.log() comme ceci :

Salut ! // premier console.log
comment // deuxiĂšme console.log
ça va ? // troisiÚme console.log

Mon Observable, et plus particuliĂšrement son callback, fait 3 .next() sur le flux de donnĂ©es, poussant donc 3 valeurs, ainsi au moment oĂč je vais subscribe() le flux va Ă©mettre trois fois, donc je passerai 3 fois Ă  l’intĂ©rieur du subscribe().

Il est important de comprendre que c’est le fait de .subscribe() qui dĂ©clenche l’ouverture du flux et donc qui trigger le callback passĂ© dans l’Observable. C’est exactement comme le http.get() (et les autres requĂȘtes) du HttpClient de Angular, si je ne subscribe() pas, le call http ne se lance pas. A chaque fois qu’on va subscribe() ça dĂ©clenchera un nouveau call http et crĂ©era un nouveau flux indĂ©pendant.

Observable et valeurs dynamiques

Les Observable ne vont pas systématiquement pousser des données statiques !

import { Observable } from "rxjs";

const rand = new Observable((subscriber) => {
  subscriber.next(Math.random());
});

rand.subscribe((data) => console.log(data)); // 0.159812
rand.subscribe((data) => console.log(data)); // 0.811699
rand.subscribe((data) => console.log(data)); // 0.422368

Comme vous le voyez, la donnĂ©e que l’on pousse dans le .next() n’est pas forcĂ©ment une valeur figĂ©e. Ici c’est Math.random() que je pousse. De ce fait Ă  chaque subscribe() le Math.random() va ĂȘtre executĂ©, on aura donc une valeur diffĂ©rente.

Un autre exemple :

const counter = new Observable((subscriber) => {
  let counter = 0;
  setInterval(() => {
    subscriber.next(counter++);
  }, 1000);
});

counter.subscribe((data) => console.log(data)); // Ă©met toutes les secondes

// 5 secondes plus tard je m'abonne Ă  nouveau
setTimeout(() => {
  counter.subscribe((data) => console.log(data)); // Ă©met toutes les secondes
}, 5000);

counter dĂ©clenche un intervalle oĂč chaque seconde une nouvelle donnĂ©e est poussĂ©e, donc chaque abonnement va crĂ©er un nouvel intervalle. J’ai fait dĂ©marrer le second abonnement 5 secondes plus tard juste pour vous montrer que les flux sont vraiment indĂ©pendants.

Et c’est une diffĂ©rence fondamentale entre les Observable et les Subject !

  • Avec les Observables, chaque abonnĂ© crĂ©Ă© un flux de donnĂ©es indĂ©pendant entre lui et l’Observable dĂšs qu’il .subscribe(). On dit alors que les Observable font du unicasting.
  • Avec les Subject, tous les abonnĂ©s se branchent au mĂȘme flux de donnĂ©es en faisant un .subscribe() et dĂšs que le Subject Ă©met suite Ă  un .next() alors les abonnĂ©s vont tous recevoir cette nouvelle valeur. On dit alors que les Subjectfont du multicasting.

Et cette diffĂ©rence repose uniquement sur la question de l’endroit oĂč se fait le .next(), et en rĂ©alitĂ© c’est tout Ă  fait logique, regardez cet exemple :

import { Observable, Subject } from "rxjs";

const randObs = new Observable((subscriber) => {
  subscriber.next(Math.random());
});

randObs.subscribe(console.log);
randObs.subscribe(console.log);
randObs.subscribe(console.log);

const randSub = new Subject();

randSub.subscribe(console.log);
randSub.subscribe(console.log);
randSub.subscribe(console.log);

randSub.next(Math.random());

Les trois abonnĂ©s sur randObs sont complĂštement indĂ©pendant, et dĂšs qu’ils ont subscribe() ils ont crĂ©Ă© un nouveau flux et relancer un nouveau Math.random(), donc ils n’ont pas reçu la mĂȘme valeur !

Tandis que pour le Subject, j’ai d’abord crĂ©Ă© trois abonnements puis je pousse un nouveau Math.random() donc les trois abonnements vont bien recevoir la mĂȘme chose !

C’est Ă©galement le cas avec un BehaviorSubject :

const randSub = new BehaviorSubject(Math.random());

randSub.subscribe(console.log);
randSub.subscribe(console.log);
randSub.subscribe(console.log);

Les trois abonnĂ©s vont recevoir la mĂȘme donnĂ©e initiale, pourquoi ? Et bien tout simplement parce qu’un BehaviorSubject exĂ©cute ce qu’on lui passe en paramĂštre dĂšs qu’il se crĂ©Ă©.

Le parallĂšle avec Angular

Bon, c’est intĂ©ressant tout ça, mais comment ça s’inscrit dans la logique de crĂ©ation d’un application Angular ?

En fait, vous n’allez quasiment jamais faire de new Observable, en tout cas personnellement en 7 ans d’utilisation d’Angular je ne l’ai pas fait une seule fois. Tandis que new Subject ou new BehaviorSubject est relativement commun (moins si vous utilisez une librairie de State Management car celles-ci l’utilisent dĂ©jĂ  sous le capot), ça vous servira Ă  crĂ©er ce pattern oĂč vous aurez une donnĂ©e indĂ©pendante que vous pourrez lire et Ă©crire depuis n’importe quel composant ou service en respectant une separation of concerns.

En revanche, vous allez beaucoup utiliser les Observables mis Ă  disposition par Angular :

  • Le HttpClient et ses requĂȘtes qui renvoient des Observables
  • ActivatedRoute qui permet de s’abonner Ă  la route courante
  • valueChanges pour Ă©couter les changements de valeurs des diffĂ©rents champs d’un formulaire
  • Et bien d’autres

D’ailleurs on peut facilement imaginer ce Ă  quoi pourrait ressembler l’implĂ©mentation de http.get() de Angular :

const httpGet = (endpoint: string) => new Observable(subscriber => {
  // ici il y a la requĂȘtte HTTP avec xhr en
  // utilisant l'endpoint en paramĂštre
  const resultFromHttpRequest = ...;
  subscriber.next(dataFromHttpRequest);
  subscriber.complete()
})

Bien entendu la vraie implĂ©mentation de la fonction est plus complexe car on gĂšre les erreurs, les side effects, le HttpHeader etc, mais dans l’idĂ©e c’est bien ça ! Le subscriber.complete() sert Ă  couper le flux de tous les abonnĂ©s, dans les requĂȘtes de HttpClient c’est fait automatiquement.

D’ailleurs en parlant de ça, subscribe() ne prend pas forcĂ©ment qu’une fonction callback, on peut aussi lui passer un objet pour prĂ©ciser quoi faire en cas d’erreur ou lors du complete (quand le flux est coupĂ©) :

const obs = new Observable((subscriber) => {
  try {
    subscriber.next(Math.random());
    subscriber.complete();
  } catch (error) {
    throw new Error(error);
  }
});

obs.subscribe({
  next: (data) => console.log(data),
  error: (error) => console.error(error),
  complete: () => console.log("complete"),
});
  • next() va s’exĂ©cuter Ă  chaque Ă©mission de nouvelle donnĂ©e
  • error va s’exĂ©cuter quand obs va lever une erreur
  • complete va s’exĂ©cuter quand le flux va se couper

Les opérateurs

Ces outils puissants (et parfois difficiles Ă  maĂźtriser) vous permettront d’apprĂ©hender la programmation rĂ©active au sein de vos applications Angular. Pour guider la suite de cette article, nous tenterons de rĂ©pondre Ă  deux questions :

  1. Qu’est-ce qu’un opĂ©rateur RxJS?
  2. Quels sont les opérateurs les plus utilisés et pour quelles raisons ?
Qu’est-ce qu’un opĂ©rateur RxJS ?

Comme vous le savez dĂ©jĂ , un Observable est un mĂ©canisme de gestion de flux de donnĂ©es. Chaque fois qu’une donnĂ©e est injectĂ©e dans un Observable (via une mĂ©thode .next()), cette donnĂ©e est Ă©mise Ă  tous les abonnĂ©s (via un subscribe() ou | async).

C’est lĂ  qu’interviennent les opĂ©rateurs. Leur rĂŽle est d’intervenir dans le flux de donnĂ©es et de faire quelque chose Ă  chaque Ă©mission, par exemple transformer la donnĂ©e, la filtrer, et bien d’autres encore.

Il existe également des opérateurs qui vont créer des Observables à partir de rien, ceux-là ne vont donc pas modifier les valeurs émient mais bel et bien construire leurs propres flux.

Il existe donc deux grandes catĂ©gories d’opĂ©rateurs :

  1. les opérateurs de réaction, car ils font quelque chose à chaque émission de donnée
  2. les opérateurs de création, car ils permettent de créer des Observables

Voici un exemple qui illustre comment utiliser les opérateurs dans un contexte Angular :

import { interval } from "rxjs";

@Component({
  template: `Compteur : {{ counter$ | async }}`,
})
export class AppComponent {
  // va Ă©mettre 0..1..2..3..4.. chaque seconde dĂšs qu'on s'abonne
  readonly counter$ = interval(1000);
}

Dans cet exemple, grĂące Ă  l’opĂ©rateur de crĂ©ation interval(), j’ai crĂ©e un Observable qui va Ă©mettre 1, puis 2, puis 3, puis 4 etc chaque seconde (car j’ai prĂ©cisĂ© 1000 dans ses paramĂštres) dĂšs lors qu’on s’abonne dessus, c’est donc utile pour faire des compteurs.

Maintenant, admettons que j’ai envie de doubler la valeur de chacune des Ă©missions. Je pourrais faire comme ça :

import { interval } from "rxjs";

@Component({
  template: `Compteur : {{ counter }}`,
})
export class AppComponent {
  doubleCounter = 0;
  readonly counter$ = interval(1000).subscribe(
    (value) => (this.doubleCounter = value * 2)
  );
}

Cela marcherait trĂšs bien, mais imaginez que j’ai besoin de ce compteur doublĂ© Ă  plusieurs endroits dans mon application, je devrais refaire ça Ă  chaque fois, donc pas top pour d’éventuelles Ă©volutions ou refacto et en plus il faut penser au unsubscribe.

Et c’est exactement lĂ  que les opĂ©rateurs entrent en jeu.

import { interval, map } from "rxjs";

@Component({
  template: `Compteur : {{ doubleCounter$ | async }}`,
})
export class AppComponent {
  // va Ă©mettre 0..2..4..6..8.., chaque seconde
  readonly doubleCounter$ = interval(1000).pipe(map((value) => value * 2));
}

Les opĂ©rateurs se mettent dans myObs$.pipe(...), il faut vraiment le voir comme une opĂ©ration qui se dĂ©clenche Ă  chaque Ă©mission. Ici, j’utilise l’opĂ©rateur map qui a pour but de transformer la donnĂ©e Ă  chaque Ă©mission, et lĂ  en l’occurrence je double sa valeur.

Et ce qui est formidable (et trĂšs important), c’est qu’on peut enchainer les opĂ©rateurs dans la fonction pipe.

import { interval, map } from "rxjs";

@Component({
  template: `Compteur : {{ doubleCounter$ | async }}`,
})
export class AppComponent {
  readonly doubleCounter$ = interval(1000).pipe(
    tap((value) => console.log(value)), // 0..1..2..3..
    map((value) => value * 2), // double la donnée à chaque émission
    tap((value) => console.log(value)), // 0..2..4..6..
    filter((value) => value > 10) // bloque les émissions si inférieur à 10
  );
}

Ici on fait pleins de trucs :

  • tap a pour but d’effectuer des effets de bords, c’est Ă  dire qu’on ne modifie pas le flux actuel, mais on y a accĂšs quand mĂȘme. Ici je l’utilise deux fois, la premiĂšre fois pour faire un console.log AVANT le map et la seconde fois APRES. C’est trĂšs utile pour se rendre compte de ce qu’émet notre Observable.
  • map modifie chaque Ă©mission, comme dit prĂ©cĂ©demment.
  • filter qui va bloquer les Ă©missions si la valeur renvoyĂ©e par la fonction est false. Ici donc l’Observable ne va Ă©mettre que si la valeur est plus grande que 10.

Les opĂ©rateurs sont donc un excellent moyen de contrĂŽler nos flux de donnĂ©es ! C’est difficile de se rendre compte de la puissance des opĂ©rateurs donc je vous invite Ă  essayer par vous-mĂȘme sur ce lien Stackblitz.

Mais à quoi ça sert tout ça ?

“Je pourrais faire la mĂȘme chose avec une fonction qui utilise un setInterval et quelques if non ?”

C’est vrai, mais RxJS vous ouvre les voies de la programmation rĂ©active et dĂ©clarative. C’est Ă  dire que vous dĂ©clarez ce qui doit se passer en rĂ©ponse Ă  certains Ă©vĂ©nements, sans avoir Ă  vous soucier des dĂ©tails de mise en Ɠuvre.

Avec setInterval, vous devez gĂ©rer manuellement les cas oĂč vous souhaitez arrĂȘter l’intervalle, reprendre l’intervalle ou gĂ©rer des erreurs.

Par contre, avec l’opĂ©rateur interval de RxJS, vous obtenez un Observable qui Ă©met des Ă©vĂ©nements Ă  un intervalle rĂ©gulier. Cet Observable peut ĂȘtre facilement composĂ© avec d’autres Observables, manipulĂ© avec d’autres opĂ©rateurs, ou arrĂȘtĂ© ou repris Ă  tout moment. En cas d’erreur, vous pouvez gĂ©rer cela de maniĂšre dĂ©clarative avec les opĂ©rateurs d’erreur.

Le véritable pouvoir de RxJS réside dans la combinaison de ces opérateurs pour créer des flux de données complexes et gérables de maniÚre trÚs lisible et maintenable.

Par exemple, imaginez que vous souhaitez Ă©mettre un Ă©vĂ©nement toutes les secondes, mais seulement pendant les 10 premiĂšres secondes. Avec setInterval, vous devriez mettre en place un compteur, un if pour vĂ©rifier le compteur, et un clearInterval pour arrĂȘter l’émission. Avec RxJS, vous pouvez simplement combiner les opĂ©rateurs interval et take :

import { interval } from "rxjs";
import { take } from "rxjs/operators";

interval(1000).pipe(take(10)).subscribe(console.log);

Cela Ă©mettra un nombre croissant chaque seconde, de 0 Ă  9. C’est la beautĂ© de la programmation rĂ©active : vous dĂ©clarez ce que vous voulez faire, et RxJS s’occupe du comment. Je prĂ©pare un article sur la programmation rĂ©active qui devrait sortir au mois de juillet !

Ok, maintenant que l’intro est faite, dĂ©couvrons ensemble les opĂ©rateurs RxJS les plus frĂ©quents qui vous suffiront pour 99% des scĂ©narios.

Les opérateurs de création

of()

Cet opĂ©rateur permet de crĂ©er un Observable Ă  partir d’une valeur qu’on lui passe.

import { of } from "rxjs";

const products$ = of([
  { id: 1, name: "product1", price: 10 },
  { id: 2, name: "product2", price: 30 },
  { id: 3, name: "product3", price: 5 },
]).subscribe(console.log);
// affiche le tableau passé en argument.

Cela peut ĂȘtre utile lorsque vous devez crĂ©er rapidement un Observable Ă  partir de valeurs statiques, par exemple dans un service pour simuler une donnĂ©e qui viendrait du backend.

interval

Crée un Observable qui émet une séquence infinie de valeurs entiÚres à intervalles réguliers.

import { interval } from "rxjs";

const counter = interval(1000).subscribe(console.log);
// affiche 0..1..2..3..4 chaque seconde

C’est utile pour crĂ©er des compteurs, des dĂ©lais ou des intervalles de temps, par exemple pour une fonctionnalitĂ© de minuterie dans une application.

combineLatest

Cet opĂ©rateur combine les derniĂšres valeurs de plusieurs Observables. Il Ă©met une nouvelle valeur chaque fois que l’un des Observables qu’il combine Ă©met une valeur. Il est Ă  noter que combineLatest ne va pas Ă©mettre tant que chacun de ses Observables n’a pas Ă©mis au moins une valeur.

import { combineLatest, of } from "rxjs";
import { delay } from "rxjs/operators";

const products$ = http.get("api/products");
const orders$ = http.get("api/orders");
const result$ = combineLatest({ products: products$, orders: orders$ });

result$.subscribe(({ products, orders }) => {
  console.log(products);
  console.log(orders);
});

C’est pratique lorsque vous voulez combiner les rĂ©sultats de plusieurs requĂȘtes HTTP ou lorsque vous souhaitez synchroniser plusieurs flux de donnĂ©es pour faire une opĂ©ration basĂ©e sur plusieurs flux.

merge

Cet opĂ©rateur combine plusieurs Observables en un seul, en Ă©mettant les valeurs de chaque Observable dĂšs qu’elles sont disponibles.

import { merge, of } from "rxjs";
import { delay } from "rxjs/operators";

const obs1 = of("Hello");
const obs2 = of("World").pipe(delay(1000));
const merged = merge(obs1, obs2);

merged.subscribe((value) => console.log(value));
// Affiche 'Hello', puis 'World' aprÚs un délai

Utile lorsque vous voulez fusionner plusieurs flux de donnĂ©es en un seul, par exemple lors de l’agrĂ©gation de donnĂ©es Ă  partir de plusieurs sources de donnĂ©es dans divers services.

defer

Il permet de retarder la crĂ©ation de l’Observable jusqu’à ce qu’un Observateur s’y abonne. Cela signifie que l’Observable n’est crĂ©Ă© qu’au moment de l’abonnement, ce qui permet de s’assurer que l’Observable encapsule toujours les donnĂ©es les plus Ă  jour.

import { defer } from "rxjs";

const currentTime$ = defer(() => of(new Date().getTime()));

currentTime$.subscribe(console.log);
// 👆 m'affiche bien le moment actuel
// qu'importe Ă  quel moment je subscribe
// si j'avais utilisé simplement of(new Date().getTime())
// alors cela m'aurait retournĂ© le mĂȘme rĂ©sultat Ă  chaque subscribe

C’est utile quand vous voulez retarder l’exĂ©cution de la fonction qui gĂ©nĂšre l’Observable jusqu’à ce qu’un abonnĂ© soit prĂ©sent.

Les opérateurs de réaction

map

Cet opĂ©rateur est utilisĂ© pour appliquer une fonction Ă  chaque valeur Ă©mise par l’Observable source.

import { of } from "rxjs";
import { map } from "rxjs/operators";

const nums = of(1, 2, 3);
const squareNums = nums.pipe(map((n) => n * n));

squareNums.subscribe((x) => console.log(x)); // Affiche 1, 4, 9

C’est utile lorsque vous voulez transformer les donnĂ©es reçues d’un Observable, par exemple pour reformater les donnĂ©es reçues d’une API.

mergeMap

Il permet de gĂ©rer les situations oĂč on voudrait subscribe dans un subscribe. C’est Ă  dire les situations oĂč les valeurs Ă©mient par le flux de donnĂ©es sont elles-mĂȘmes des Observable ou bien qu’on veut renvoyer un Observable grĂące Ă  la donnĂ©e Ă©mise. mergeMap va s’abonner Ă  la valeur et renvoyer le rĂ©sultat de l’abonnement.

import { mergeMap } from 'rxjs/operators';

@Component(...)
export class AppComponent {
  http = inject(HttpClient);
  productID$ = new FormControl();

  product$ = productID$.pipe(
    mergeMap(productID => this.http.get(`api/products/${productID}`))
  )
}

Il est particuliĂšrement utile pour gĂ©rer les scĂ©narios oĂč chaque valeur Ă©mise par un Observable est elle-mĂȘme un Observable, comme lors de l’envoi de requĂȘtes HTTP en rĂ©ponse Ă  des Ă©vĂ©nements de l’utilisateur.

switchMap

Cet opĂ©rateur est similaire Ă  mergeMap(), mais annule les valeurs prĂ©cĂ©dentes chaque fois qu’une nouvelle valeur est Ă©mise.

import { switchMap } from 'rxjs/operators';

@Component(...)
export class AppComponent {
  http = inject(HttpClient);
  productID$ = new FormControl();

  // si productID$ Ă©met avant que le http.get se termine
  // alors le call HTTP sera cancel et remplacé par le suivant
  product$ = productID$.pipe(
    switchMap(productID => this.http.get(`api/products/${productID}`))
  )
}

C’est pratique lorsque vous voulez ignorer les anciennes valeurs en faveur des nouvelles, par exemple lors de l’implĂ©mentation d’une fonction de recherche autocomplĂšte oĂč seule la derniĂšre requĂȘte est pertinente.

exhaustMap

Cet opĂ©rateur est aussi similaire Ă  mergeMap(), mais il ignore les nouvelles valeurs tant que chaque valeur Ă©mise n’a pas terminĂ©.

import { exhaustMap } from 'rxjs/operators';

@Component(...)
export class AppComponent {
  http = inject(HttpClient);
  productID$ = new FormControl();

  // si productID$ Ă©met avant que le http.get se termine
  // alors cette émission sera ignorée
  product$ = productID$.pipe(
    exhaustMap(productID => this.http.get(`api/products/${productID}`))
  )
}

C’est utile dans des situations oĂč vous voulez ignorer les nouvelles valeurs jusqu’à ce que chaque valeur Ă©mise ait terminĂ© son traitement, comme lors de la gestion des clics sur un bouton pour empĂȘcher les doubles soumissions.

filter

Cet opérateur émet uniquement les valeurs qui satisfont une certaine condition.

import { of } from "rxjs";
import { filter } from "rxjs/operators";

const nums = of(1, 2, 3, 4, 5);
const evens = nums.pipe(filter((n) => n % 2 === 0));

evens.subscribe((x) => console.log(x)); // Affiche 2, 4

Il est utile lorsque vous voulez filtrer certains rĂ©sultats, par exemple en filtrant certaines entrĂ©es utilisateur ou certains rĂ©sultats d’une API.

scan

Cet opĂ©rateur fonctionne comme reduce() pour les tableaux. Il applique une fonction Ă  chaque valeur Ă©mise par l’Observable source et Ă©met le rĂ©sultat cumulatif.

import { of } from "rxjs";
import { scan } from "rxjs/operators";

const nums = of(1, 2, 3);
const sum = nums.pipe(scan((acc, curr) => acc + curr, 0));

sum.subscribe((x) => console.log(x)); // Affiche 1, 3, 6

C’est utile lorsque vous voulez accumuler des valeurs au fil du temps, par exemple pour calculer une somme totale ou pour construire un tableau de valeurs Ă©mises.

distinctUntilChanged

Cet opérateur émet une valeur uniquement si elle est différente de la derniÚre valeur émise.

import { of } from "rxjs";
import { distinctUntilChanged } from "rxjs/operators";

const nums = of(1, 1, 2, 2, 3, 3);
const distinctNums = nums.pipe(distinctUntilChanged());

distinctNums.subscribe((x) => console.log(x)); // Affiche 1, 2, 3

C’est utile lorsque vous voulez ignorer les valeurs en double, par exemple lorsqu’on crĂ©Ă© un strore fait-maison ou chaque selector utilise distinctUntilChanged() pour Ă©viter d’émettre Ă  nouveau si la valeur est la mĂȘme.

tap

Cet opĂ©rateur est utilisĂ© pour effectuer des effets de bord. Il applique une fonction Ă  chaque valeur Ă©mise par l’Observable source, mais retourne la valeur sans la modifier.

import { of } from "rxjs";
import { tap } from "rxjs/operators";

const nums = of(1, 2, 3).pipe(tap(console.log)).subscribe();

C’est utile lorsque vous voulez dĂ©clencher une action secondaire en rĂ©action Ă  une valeur Ă©mise, la mise Ă  jour d’un Ă©tat (loader par exemple) ou un console.log() pour regarder l’état du flux.

delay

Permet de dĂ©caler le moment oĂč les valeurs Ă©mises par un Observable sont rĂ©ellement Ă©mises. Cela retarde essentiellement l’émission de chaque valeur d’un certain nombre de millisecondes.

import { delay } from "rxjs/operators";

const data$ = of({ some: "data" });

data$.pipe(delay(2000)).subscribe(console.log);
// affiche l'objet passé en paramÚtre mais au bout de 2 secondes

C’est utile lorsque vous voulez retarder une action, comme l’affichage d’un message de succùs aprùs une action de l’utilisateur.

take

Cet opĂ©rateur n’émet que les n premiĂšres valeurs puis complĂšte l’Observable.

import { interval } from "rxjs";
import { take } from "rxjs/operators";

const numbers = interval(1000);
const takeFourNumbers = numbers.pipe(take(4));

takeFourNumbers.subscribe(console.oog);
// Affiche la valeur quatre fois puis se complĂšte

C’est utile lorsque vous ne voulez traiter qu’un certain nombre de valeurs, comme pour paginer les rĂ©sultats d’une requĂȘte ou limiter le nombre de tentatives d’une action.

takeUntil

Cet opĂ©rateur complĂšte l’Observable dĂšs qu’une autre Observable Ă©met une valeur.

import { interval, of } from "rxjs";
import { takeUntil, delay } from "rxjs/operators";

const numbers = interval(1000);
const stopper = of(true).pipe(delay(3500));
const takeUntilStopped = numbers.pipe(takeUntil(stopper));

takeUntilStopped.subscribe(console.log);
// Affiche la valeur jusqu'Ă  ce que stopper Ă©mette une valeur

C’est utile lorsque vous voulez arrĂȘter une action dĂšs qu’un autre Ă©vĂ©nement se produit, comme arrĂȘter un intervalle lorsqu’un utilisateur clique sur un bouton ou unsubscribe d’un flux.

takeWhile

Cet opĂ©rateur Ă©met des valeurs tant qu’une condition est vraie et complĂšte l’Observable dĂšs que la condition devient fausse.

import { interval } from "rxjs";
import { takeWhile } from "rxjs/operators";

const numbers = interval(1000);
const takeWhileLessThanFive = numbers.pipe(takeWhile((n) => n < 5));

takeWhileLessThanFive.subscribe(console.log);
// Affiche la valeur tant que n est inférieur à 5

C’est utile lorsque vous voulez continuer Ă  Ă©mettre des valeurs en fonction d’une condition, comme arrĂȘter un dĂ©filement automatique quand l’utilisateur atteint le bas de la page.

skip

Cet opĂ©rateur ignore les n premiĂšres valeurs Ă©mises par l’Observable source.

import { interval } from "rxjs";
import { skip } from "rxjs/operators";

const numbers = interval(1000);
const skipTwoNumbers = numbers.pipe(skip(2));

skipTwoNumbers.subscribe(console.log);
// Ignore les deux premiĂšres valeurs

C’est utile lorsque vous voulez ignorer les premiùres valeurs, comme lors de la mise en place d’une fonction de pagination.

debounceTime

Cet opĂ©rateur n’émet une valeur que si un certain temps s’est Ă©coulĂ© sans qu’une autre valeur soit Ă©mise.

import { debounceTime } from "rxjs/operators";

inputFormControl = new FormControl();

inputFormControl.valueChanges.pipe(debounceTime(300)).subscribe(console.log);
// Émet une valeur 300ms aprùs la derniùre saisie de l'utilisateur

C’est utile lorsque vous voulez limiter le taux de traitement, comme pour Ă©viter de faire trop de requĂȘtes pendant la frappe de l’utilisateur dans une recherche en direct.

catchError

Cet opĂ©rateur attrape les erreurs sur l’Observable source et permet de renvoyer un nouvel Observable.

import { catchError } from 'rxjs/operators';

@Component(...)
export class AppComponent {
  http = inject(HttpClient);

  products$ = this.http.get('api/products').pipe(
      catchError(error => {
        console.error('Une erreur est survenue lors de la récupération des données :', error);
        // On peut afficher un message d'erreur Ă©galement

        // On retourne un Observable avec des données par défaut en cas d'erreur
        return of({ data: 'Données par défaut' });
      })
    );

}

C’est utile lorsque vous voulez gĂ©rer les erreurs et peut-ĂȘtre remplacer les valeurs erronĂ©es par des valeurs par dĂ©faut.

retry

Cet opĂ©rateur re-souscrit Ă  l’Observable source si une erreur est produite, en permettant un certain nombre de tentatives.

import { retry } from 'rxjs/operators';

@Component(...)
export class AppComponent {
  http = inject(HttpClient);

  products$ = this.http.get('api/products').pipe(
    retry(3) // RĂ©essaye la requĂȘte HTTP 3 fois en cas d'erreur
  );

}

C’est utile lorsque vous voulez rĂ©essayer une action en cas d’erreur, comme pour retenter une requĂȘte HTTP qui a Ă©chouĂ©.

startWith

Cet opĂ©rateur fait en sorte que l’Observable source commence par Ă©mettre une ou plusieurs valeurs spĂ©cifiques.

import { of } from "rxjs";
import { startWith } from "rxjs/operators";

const numbers = of(1, 2, 3);
const startWithZero = numbers.pipe(startWith(0));

startWithZero.subscribe((x) => console.log(x)); // Affiche 0, 1, 2, 3

C’est utile lorsque vous voulez fournir des valeurs initiales, comme pour prĂ©-remplir un champ de formulaire ou afficher une valeur par dĂ©faut pendant le chargement des donnĂ©es.