Tout ce que je sais sur RxJS
Lâautre jour un membre de ma communautĂ© Discord a posĂ© une question trĂšs intĂ©ressante
Je lui ai Ă©crit une rĂ©ponse qui mâa pris 45min Ă rĂ©diger tant cette question implique beaucoup de choses. Et ça mâa donnĂ© lâenvie de me lancer dans un article sur un sujet que jâaime : RxJS. Et la question Observable
vs Subject
est un bel angle par lequel Ă©tudier tout cela.
Alors faisons un grand plongeon dans RxJS pour essayer de démystifier de maniÚre la plus simple possible tous les rouages de cette librairie.
RxJS, ça devrait vous ĂȘtre familier. Câest une librairie qui permet de gĂ©rer lâasynchronicitĂ© des Ă©vĂšnements et des donnĂ©es au sein dâune application Javascript, et RxJS est largement utilisĂ© dans Angular.
Et mĂȘme si les Signal vont probablement rĂ©duire lâutilisation de la librairie, ce nâest pas prĂ©vu de si tĂŽt et ça ne sera pas complĂštement le cas, alors faisons un point sur une question fondamentale qui va nous permettre de comprendre RxJS dans son intĂ©gralitĂ© : câest quoi la diffĂ©rence entre un Observable
et un Subject
?
Avant de parler de leurs différences, parlons de leurs ressemblances.
import { Observable, Subject } from "rxjs";
const obs = new Observable();
const sub = new Subject();
Ce quâon peut voir, câest que ce sont tous les deux des class que lâon peut instancier.
import { Observable, Subject } from "rxjs";
const obs = new Observable();
const sub = new Subject();
obs.subscribe((data) => console.log(`Obs ${data}`));
sub.subscribe((data) => console.log(`Sub ${data}`));
Mais lâintĂ©rĂȘt de ces deux class
, câest surtout quâon peut sâabonner dessus avec la fonction subscribe
. âSâabonnerâ veut dire quâĂ lâinstant oĂč une donnĂ©e va ĂȘtre poussĂ©e dans lâObservable
ou le Subject
alors ces derniers vont Ă©mettre cette donnĂ©e, ainsi chaque abonnĂ© va recevoir cette nouvelle donnĂ©e immĂ©diatement et ce quâil y a dans le .subscribe()
va se trigger.
On peut donc en conclure que les Observable
et Subject
permettent de crĂ©er des flux de donnĂ©es, ce flux est continu, il ne sâinterrompt pas (sauf si on lui dit explicitement). Et pour ouvrir ce flux de donnĂ©es, il faut sâabonner en utilisant .subscribe()
.
Dans mon exemple je me suis abonnĂ© Ă la fois Ă obs et sub, ainsi dĂšs quâune donnĂ©e va ĂȘtre poussĂ©e dans lâun ou lâautre alors le console.log
va sâexĂ©cuter.
Mais pas avant !
Ce que je veux dire par lĂ , câest que si jâexĂ©cute ce script, rien ne va sâafficher. Pourquoi ? Parce quâĂ aucun moment je nâai poussĂ© de nouvelle donnĂ©e. Mes deux abonnements sont faits, les flux sont ouverts, mais Ă©tant donnĂ© que je nâai pas encore poussĂ© de donnĂ©e alors obs
ou sub
nâĂ©mettent rien, donc je ne rentre pas dans le callback du tout.
En rĂ©sumĂ©, voilĂ comment le dĂ©roulĂ© de âvieâ dâun Observable
(ça marche pareil pour un Subject
)
- Je crĂ©Ă© lâ
Observable
avecconst obs = new Observable();
- Dans divers endroits de mon application je mâabonne Ă mon
Observable
en faisantobs.subscribe(...)
- DĂšs quâune donnĂ©e est poussĂ©e dans lâ
Observable
(on va voir comment juste aprĂšs) alorsobs
va lâĂ©mettre, en dâautres termes il va la pousser dans son flux - Les abonnĂ©es vont recevoir cette donnĂ©e car ils Ă©coutent le flux et pourront faire des trucs divers et variĂ©s avec cette donnĂ©e
.next() pour pousser des données
import { Subject } from "rxjs";
const sub = new Subject();
sub.subscribe((data) => console.log(`Sub ${data}`));
sub.next("Salut !");
Mesdames et messieurs, devant aux yeux ébahis je viens de pousser une donnée dans sub
! đ
Maintenant, que se passe t-il si jâexĂ©cute ce code ? Et bien vous lâaurez devinĂ©, je verrais Sub Salut
! qui provient du console.log("Sub ${data}")
. Tout simplement parce que lorsque jâai fait sub.next('Salut !)
, jâai poussĂ© une nouvelle donnĂ©e dans le Subject
et ce dernier lâa Ă©mise, ainsi tous les abonnĂ©s lâont reçues.
Je dis bien âtous les abonnĂ©sâ car je peux mâabonner plusieurs fois :
import { Subject } from "rxjs";
const sub = new Subject();
sub.subscribe((data) => console.log(`Sub1 ${data}`));
sub.subscribe((data) => console.log(`Sub2 ${data}`));
sub.subscribe((data) => console.log(`Sub3 ${data}`));
sub.next("Salut !");
Ainsi, tous les abonnĂ©s vont recevoir cette mĂȘme valeur. A noter que si jâavais mis le sub.next('Salut !');
avant les subscribe()
alors les abonnĂ©s nâauraient reçu aucune donnĂ©e car la valeur aurait Ă©tĂ© Ă©mise avant un quelconque abonnement.
Utiliser les Subject
dans Angular
Mais alors à quoi ça sert ? Comment tirer profit de cela ?
Et bien RxJS est trĂšs utile dans le monde JS pour gĂ©rer des Ă©vĂšnements de maniĂšres isolĂ©es. Par exemple si je veux que le clique dâun bouton mette Ă jour un compteur et que la valeur de ce compteur soit affichĂ© dans plusieurs composants diffĂ©rents au sein de mon application, je peux utiliser un Subject
. Ce Subject
je vais le crĂ©er dans un fichier Ă part et pas directement dans un composant car sa valeur nâappartient Ă aucun composant en particulier. Et ce fichier, dans le cas de Angular, sera un service.
@Injectable({ providedIn: "root" })
export class CounterService {
readonly count = new Subject<number>();
constructor() {
this.count.next(0); // j'initialise la valeur Ă 0
}
increment() {
// this.count.next(???); que mettre ici ?
}
}
Attendez une minute⊠Comment on fait pour récupérer la valeur actuelle de count
pour lâincrĂ©menter ?
BehaviorSubject
Ă la rescousse !
import { BehaviorSubject } from "rxjs";
@Injectable({ providedIn: "root" })
export class CounterService {
readonly count = new BehaviorSubject<number>(0);
increment() {
this.count.next(this.count.value + 1);
}
}
BehaviorSubject
câest une autre class qui vient de RxJS et qui est trĂšs similaire Ă Subject, sauf que :
BehaviorSubject
a une notion de âvaleur couranteâ. Quand unBehaviorSubject
Ă©met une donnĂ©e, il va Ă©galement lâenregistrer en son sein. Par consĂ©quent, on a accĂšs Ă la propriĂ©tĂ© .value qui va nous renvoyer cette valeur- Quand on initialise un
BehaviorSubject
on doit lui passer une valeur initiale, câest exactement ce que je fais en faisantreadonly count = new BehaviorSubject<number>(0)
. Du coup, plus besoin de faire lethis.count.next(0)
dans leconstructor
!
PlutĂŽt pas mal non ? Je peux maintenant utiliser les valeurs Ă©mient par count
en mâabonnant Ă celui-ci directement dans mes composants et Ă©galement pousser une valeur incrĂ©mentĂ©e :
@Component({
standalone: true,
template: `
<p>Counter: {{ count }}</p>
<button (click)="counterService.increment()">+1</button>
`,
})
export class SomeComponent implements OnInit {
readonly countService = inject(CounterService);
count!: number;
ngOnInit() {
this.countService.count.subscribe((value) => (this.count = value));
}
}
Je peux implĂ©menter cette fonctionnalitĂ© partout oĂč je veux dans mon application, certains autres composants pourraient utiliser une fonction decrement
, ou simplement afficher la valeur de count
. Jâai une application oĂč la separation of concerns est respectĂ©e ! Le CounterService
crĂ©Ă© est ce quâon appelle un âstoreâ : une source de vĂ©ritĂ© qui contient mes donnĂ©es, le moyen dâaccĂ©der Ă ces donnĂ©es par dâautres briques de mon application, et les mĂ©thodes qui modifient ces donnĂ©es.
En bref, utilisez BehaviorSubject
lorsque vous voulez crĂ©Ă© un store manuellement sans librairie tiĂšrce ! Câest dâailleurs ce quâutilise NgRx sous la capot.
Mais⊠Mon implĂ©mentation actuel est loin dâĂȘtre parfaite ! AmĂ©liorons tout ça en introduisant une notion : les Observable
.
import { Observable } from "rxjs";
@Injectable({ providedIn: "root" })
export class CounterService {
private readonly count = new BehaviorSubject<number>(0);
readonly count$ = this.count.asObservable();
increment() {
this.count.next(this.count.value + 1);
}
}
Ici il y a deux différences :
- Jâai crĂ©Ă© une propriĂ©tĂ© publique
count$
qui est Ă©gale Ăthis.count.asObservable()
- Jâai mis le
BehaviorSubject
en privé
count$
est donc un Observable
grĂące Ă .asObservable()
. Un Observable est un flux de données tout comme un Subject
ce qui signifie quâon peut faire un subscribe()
dessus. Mais la grande différence est la suivante : sur un Observable
on ne peut pas faire de .next()
.
Un Observable
est comme un Subject
mais read-only, on ne peut pas lui pousser des donnĂ©es. Câest donc trĂšs utile pour exposer aux autres parties de mon application des donnĂ©es sans leurs donner la possibilitĂ© de les modifier directement car cette responsabilitĂ© incombe au service oĂč est dĂ©clarĂ©e ma donnĂ©e.
Je peux donc changer le code de mon composant afin dâutiliser count$
, jâen profite le rendre plus optimal en utilisant le pipe async
qui a pour but subscribe
automatiquement et de unsubscribe
quand mon composant est détruit (par exemple quand je change de page) :
@Component({
standalone: true,
template: `
<p>Counter: {{ countService.count$ | async }}</p>
<button (click)="counterService.increment()">increment</button>
`,
})
export class SomeComponent {
readonly countService = inject(CounterService);
}
Comment fonctionne les Observable
Allons plus loin dans la découverte des Observable
en nâen crĂ©ant un.
import { Observable } from "rxjs";
const obs = new Observable();
Comme je le disais plus haut, cet obs
ne fait rien, en rĂ©alitĂ© on ne crĂ©Ă© jamais dâObservable
comme ça car un Observable
attend une fonction en callback :
import { Observable } from "rxjs";
const obs = new Observable((subscriber) => {
subscriber.next("Salut !");
});
obs.subscribe((data) => console.log(data)); // Salut !
Câest en rĂ©alitĂ© dans le callback dâun Observable
que lâon doit faire notre .next()
, et on le fait sur lâargument subscriber
qui correspond en rĂ©alitĂ© au flux de donnĂ©es que chaque abonnĂ© Ă©coutera lorsquâil fera .subscribe()
.
import { Observable } from "rxjs";
const obs = new Observable((subscriber) => {
subscriber.next("Salut ");
subscriber.next("comment ");
subscriber.next("ça va ?");
});
obs.subscribe((data) => console.log(data));
Ici je vais bel et bien avoir 3 console.log()
comme ceci :
Salut ! // premier console.log
comment // deuxiĂšme console.log
ça va ? // troisiÚme console.log
Mon Observable
, et plus particuliĂšrement son callback, fait 3 .next()
sur le flux de donnĂ©es, poussant donc 3 valeurs, ainsi au moment oĂč je vais subscribe()
le flux va Ă©mettre trois fois, donc je passerai 3 fois Ă lâintĂ©rieur du subscribe()
.
Il est important de comprendre que câest le fait de .subscribe()
qui dĂ©clenche lâouverture du flux et donc qui trigger le callback passĂ© dans lâObservable
. Câest exactement comme le http.get()
(et les autres requĂȘtes) du HttpClient
de Angular, si je ne subscribe() pas, le call http ne se lance pas. A chaque fois quâon va subscribe()
ça déclenchera un nouveau call http et créera un nouveau flux indépendant.
Observable
et valeurs dynamiques
Les Observable
ne vont pas systématiquement pousser des données statiques !
import { Observable } from "rxjs";
const rand = new Observable((subscriber) => {
subscriber.next(Math.random());
});
rand.subscribe((data) => console.log(data)); // 0.159812
rand.subscribe((data) => console.log(data)); // 0.811699
rand.subscribe((data) => console.log(data)); // 0.422368
Comme vous le voyez, la donnĂ©e que lâon pousse dans le .next()
nâest pas forcĂ©ment une valeur figĂ©e. Ici câest Math.random()
que je pousse. De ce fait Ă chaque subscribe()
le Math.random()
va ĂȘtre executĂ©, on aura donc une valeur diffĂ©rente.
Un autre exemple :
const counter = new Observable((subscriber) => {
let counter = 0;
setInterval(() => {
subscriber.next(counter++);
}, 1000);
});
counter.subscribe((data) => console.log(data)); // Ă©met toutes les secondes
// 5 secondes plus tard je m'abonne Ă nouveau
setTimeout(() => {
counter.subscribe((data) => console.log(data)); // Ă©met toutes les secondes
}, 5000);
counter
dĂ©clenche un intervalle oĂč chaque seconde une nouvelle donnĂ©e est poussĂ©e, donc chaque abonnement va crĂ©er un nouvel intervalle. Jâai fait dĂ©marrer le second abonnement 5 secondes plus tard juste pour vous montrer que les flux sont vraiment indĂ©pendants.
Et câest une diffĂ©rence fondamentale entre les Observable et les Subject !
- Avec les
Observables
, chaque abonnĂ© crĂ©Ă© un flux de donnĂ©es indĂ©pendant entre lui etlâObservable
dĂšs quâil.subscribe()
. On dit alors que les Observable font du unicasting. - Avec les
Subject
, tous les abonnĂ©s se branchent au mĂȘme flux de donnĂ©es en faisant un.subscribe()
et dĂšs que le Subject Ă©met suite Ă un.next()
alors les abonnés vont tous recevoir cette nouvelle valeur. On dit alors que les Subjectfont du multicasting.
Et cette diffĂ©rence repose uniquement sur la question de lâendroit oĂč se fait le .next()
, et en rĂ©alitĂ© câest tout Ă fait logique, regardez cet exemple :
import { Observable, Subject } from "rxjs";
const randObs = new Observable((subscriber) => {
subscriber.next(Math.random());
});
randObs.subscribe(console.log);
randObs.subscribe(console.log);
randObs.subscribe(console.log);
const randSub = new Subject();
randSub.subscribe(console.log);
randSub.subscribe(console.log);
randSub.subscribe(console.log);
randSub.next(Math.random());
Les trois abonnés sur randObs
sont complĂštement indĂ©pendant, et dĂšs quâils ont subscribe()
ils ont crĂ©Ă© un nouveau flux et relancer un nouveau Math.random(), donc ils nâont pas reçu la mĂȘme valeur !
Tandis que pour le Subject
, jâai dâabord crĂ©Ă© trois abonnements puis je pousse un nouveau Math.random()
donc les trois abonnements vont bien recevoir la mĂȘme chose !
Câest Ă©galement le cas avec un BehaviorSubject
:
const randSub = new BehaviorSubject(Math.random());
randSub.subscribe(console.log);
randSub.subscribe(console.log);
randSub.subscribe(console.log);
Les trois abonnĂ©s vont recevoir la mĂȘme donnĂ©e initiale, pourquoi ? Et bien tout simplement parce quâun BehaviorSubject
exĂ©cute ce quâon lui passe en paramĂštre dĂšs quâil se crĂ©Ă©.
Le parallĂšle avec Angular
Bon, câest intĂ©ressant tout ça, mais comment ça sâinscrit dans la logique de crĂ©ation dâun application Angular ?
En fait, vous nâallez quasiment jamais faire de new Observable
, en tout cas personnellement en 7 ans dâutilisation dâAngular je ne lâai pas fait une seule fois. Tandis que new Subject
ou new BehaviorSubject
est relativement commun (moins si vous utilisez une librairie de State Management car celles-ci lâutilisent dĂ©jĂ sous le capot), ça vous servira Ă crĂ©er ce pattern oĂč vous aurez une donnĂ©e indĂ©pendante que vous pourrez lire et Ă©crire depuis nâimporte quel composant ou service en respectant une separation of concerns.
En revanche, vous allez beaucoup utiliser les Observables
mis Ă disposition par Angular :
- Le
HttpClient
et ses requĂȘtes qui renvoient desObservables
ActivatedRoute
qui permet de sâabonner Ă la route courantevalueChanges
pour Ă©couter les changements de valeurs des diffĂ©rents champs dâun formulaire- Et bien dâautres
Dâailleurs on peut facilement imaginer ce Ă quoi pourrait ressembler lâimplĂ©mentation de http.get()
de Angular :
const httpGet = (endpoint: string) => new Observable(subscriber => {
// ici il y a la requĂȘtte HTTP avec xhr en
// utilisant l'endpoint en paramĂštre
const resultFromHttpRequest = ...;
subscriber.next(dataFromHttpRequest);
subscriber.complete()
})
Bien entendu la vraie implémentation de la fonction est plus complexe car on gÚre les erreurs, les side effects, le HttpHeader
etc, mais dans lâidĂ©e câest bien ça ! Le subscriber.complete()
sert Ă couper le flux de tous les abonnĂ©s, dans les requĂȘtes de HttpClient
câest fait automatiquement.
Dâailleurs en parlant de ça, subscribe()
ne prend pas forcĂ©ment quâune fonction callback, on peut aussi lui passer un objet pour prĂ©ciser quoi faire en cas dâerreur ou lors du complete
(quand le flux est coupé) :
const obs = new Observable((subscriber) => {
try {
subscriber.next(Math.random());
subscriber.complete();
} catch (error) {
throw new Error(error);
}
});
obs.subscribe({
next: (data) => console.log(data),
error: (error) => console.error(error),
complete: () => console.log("complete"),
});
next()
va sâexĂ©cuter Ă chaque Ă©mission de nouvelle donnĂ©eerror
va sâexĂ©cuter quand obs va lever une erreurcomplete
va sâexĂ©cuter quand le flux va se couper
Les opérateurs
Ces outils puissants (et parfois difficiles Ă maĂźtriser) vous permettront dâapprĂ©hender la programmation rĂ©active au sein de vos applications Angular. Pour guider la suite de cette article, nous tenterons de rĂ©pondre Ă deux questions :
- Quâest-ce quâun opĂ©rateur RxJS?
- Quels sont les opérateurs les plus utilisés et pour quelles raisons ?
Quâest-ce quâun opĂ©rateur RxJS ?
Comme vous le savez dĂ©jĂ , un Observable est un mĂ©canisme de gestion de flux de donnĂ©es. Chaque fois quâune donnĂ©e est injectĂ©e dans un Observable (via une mĂ©thode .next()
), cette donnée est émise à tous les abonnés (via un subscribe()
ou | async
).
Câest lĂ quâinterviennent les opĂ©rateurs. Leur rĂŽle est dâintervenir dans le flux de donnĂ©es et de faire quelque chose Ă chaque Ă©mission, par exemple transformer la donnĂ©e, la filtrer, et bien dâautres encore.
Il existe également des opérateurs qui vont créer des Observables à partir de rien, ceux-là ne vont donc pas modifier les valeurs émient mais bel et bien construire leurs propres flux.
Il existe donc deux grandes catĂ©gories dâopĂ©rateurs :
- les opérateurs de réaction, car ils font quelque chose à chaque émission de donnée
- les opérateurs de création, car ils permettent de créer des Observables
Voici un exemple qui illustre comment utiliser les opérateurs dans un contexte Angular :
import { interval } from "rxjs";
@Component({
template: `Compteur : {{ counter$ | async }}`,
})
export class AppComponent {
// va Ă©mettre 0..1..2..3..4.. chaque seconde dĂšs qu'on s'abonne
readonly counter$ = interval(1000);
}
Dans cet exemple, grĂące Ă lâopĂ©rateur de crĂ©ation interval()
, jâai crĂ©e un Observable qui va Ă©mettre 1, puis 2, puis 3, puis 4 etc chaque seconde (car jâai prĂ©cisĂ© 1000 dans ses paramĂštres) dĂšs lors quâon sâabonne dessus, câest donc utile pour faire des compteurs.
Maintenant, admettons que jâai envie de doubler la valeur de chacune des Ă©missions. Je pourrais faire comme ça :
import { interval } from "rxjs";
@Component({
template: `Compteur : {{ counter }}`,
})
export class AppComponent {
doubleCounter = 0;
readonly counter$ = interval(1000).subscribe(
(value) => (this.doubleCounter = value * 2)
);
}
Cela marcherait trĂšs bien, mais imaginez que jâai besoin de ce compteur doublĂ© Ă plusieurs endroits dans mon application, je devrais refaire ça Ă chaque fois, donc pas top pour dâĂ©ventuelles Ă©volutions ou refacto et en plus il faut penser au unsubscribe
.
Et câest exactement lĂ que les opĂ©rateurs entrent en jeu.
import { interval, map } from "rxjs";
@Component({
template: `Compteur : {{ doubleCounter$ | async }}`,
})
export class AppComponent {
// va Ă©mettre 0..2..4..6..8.., chaque seconde
readonly doubleCounter$ = interval(1000).pipe(map((value) => value * 2));
}
Les opérateurs se mettent dans myObs$.pipe(...)
, il faut vraiment le voir comme une opĂ©ration qui se dĂ©clenche Ă chaque Ă©mission. Ici, jâutilise lâopĂ©rateur map
qui a pour but de transformer la donnĂ©e Ă chaque Ă©mission, et lĂ en lâoccurrence je double sa valeur.
Et ce qui est formidable (et trĂšs important), câest quâon peut enchainer les opĂ©rateurs dans la fonction pipe
.
import { interval, map } from "rxjs";
@Component({
template: `Compteur : {{ doubleCounter$ | async }}`,
})
export class AppComponent {
readonly doubleCounter$ = interval(1000).pipe(
tap((value) => console.log(value)), // 0..1..2..3..
map((value) => value * 2), // double la donnée à chaque émission
tap((value) => console.log(value)), // 0..2..4..6..
filter((value) => value > 10) // bloque les émissions si inférieur à 10
);
}
Ici on fait pleins de trucs :
tap
a pour but dâeffectuer des effets de bords, câest Ă dire quâon ne modifie pas le flux actuel, mais on y a accĂšs quand mĂȘme. Ici je lâutilise deux fois, la premiĂšre fois pour faire un console.log AVANT le map et la seconde fois APRES. Câest trĂšs utile pour se rendre compte de ce quâĂ©met notre Observable.map
modifie chaque émission, comme dit précédemment.filter
qui va bloquer les émissions si la valeur renvoyée par la fonction estfalse
. Ici donc lâObservable ne va Ă©mettre que si la valeur est plus grande que 10.
Les opĂ©rateurs sont donc un excellent moyen de contrĂŽler nos flux de donnĂ©es ! Câest difficile de se rendre compte de la puissance des opĂ©rateurs donc je vous invite Ă essayer par vous-mĂȘme sur ce lien Stackblitz.
Mais à quoi ça sert tout ça ?
âJe pourrais faire la mĂȘme chose avec une fonction qui utilise un setInterval
et quelques if
non ?â
Câest vrai, mais RxJS vous ouvre les voies de la programmation rĂ©active et dĂ©clarative. Câest Ă dire que vous dĂ©clarez ce qui doit se passer en rĂ©ponse Ă certains Ă©vĂ©nements, sans avoir Ă vous soucier des dĂ©tails de mise en Ćuvre.
Avec setInterval
, vous devez gĂ©rer manuellement les cas oĂč vous souhaitez arrĂȘter lâintervalle, reprendre lâintervalle ou gĂ©rer des erreurs.
Par contre, avec lâopĂ©rateur interval de RxJS, vous obtenez un Observable qui Ă©met des Ă©vĂ©nements Ă un intervalle rĂ©gulier. Cet Observable peut ĂȘtre facilement composĂ© avec dâautres Observables, manipulĂ© avec dâautres opĂ©rateurs, ou arrĂȘtĂ© ou repris Ă tout moment. En cas dâerreur, vous pouvez gĂ©rer cela de maniĂšre dĂ©clarative avec les opĂ©rateurs dâerreur.
Le véritable pouvoir de RxJS réside dans la combinaison de ces opérateurs pour créer des flux de données complexes et gérables de maniÚre trÚs lisible et maintenable.
Par exemple, imaginez que vous souhaitez émettre un événement toutes les secondes, mais seulement pendant les 10 premiÚres secondes. Avec setInterval
, vous devriez mettre en place un compteur, un if
pour vérifier le compteur, et un clearInterval
pour arrĂȘter lâĂ©mission. Avec RxJS, vous pouvez simplement combiner les opĂ©rateurs interval
et take
:
import { interval } from "rxjs";
import { take } from "rxjs/operators";
interval(1000).pipe(take(10)).subscribe(console.log);
Cela Ă©mettra un nombre croissant chaque seconde, de 0 Ă 9. Câest la beautĂ© de la programmation rĂ©active : vous dĂ©clarez ce que vous voulez faire, et RxJS sâoccupe du comment. Je prĂ©pare un article sur la programmation rĂ©active qui devrait sortir au mois de juillet !
Ok, maintenant que lâintro est faite, dĂ©couvrons ensemble les opĂ©rateurs RxJS les plus frĂ©quents qui vous suffiront pour 99% des scĂ©narios.
Les opérateurs de création
of()
Cet opĂ©rateur permet de crĂ©er un Observable Ă partir dâune valeur quâon lui passe.
import { of } from "rxjs";
const products$ = of([
{ id: 1, name: "product1", price: 10 },
{ id: 2, name: "product2", price: 30 },
{ id: 3, name: "product3", price: 5 },
]).subscribe(console.log);
// affiche le tableau passé en argument.
Cela peut ĂȘtre utile lorsque vous devez crĂ©er rapidement un Observable Ă partir de valeurs statiques, par exemple dans un service pour simuler une donnĂ©e qui viendrait du backend.
interval
Crée un Observable qui émet une séquence infinie de valeurs entiÚres à intervalles réguliers.
import { interval } from "rxjs";
const counter = interval(1000).subscribe(console.log);
// affiche 0..1..2..3..4 chaque seconde
Câest utile pour crĂ©er des compteurs, des dĂ©lais ou des intervalles de temps, par exemple pour une fonctionnalitĂ© de minuterie dans une application.
combineLatest
Cet opĂ©rateur combine les derniĂšres valeurs de plusieurs Observables. Il Ă©met une nouvelle valeur chaque fois que lâun des Observables quâil combine Ă©met une valeur. Il est Ă noter que combineLatest
ne va pas Ă©mettre tant que chacun de ses Observables nâa pas Ă©mis au moins une valeur.
import { combineLatest, of } from "rxjs";
import { delay } from "rxjs/operators";
const products$ = http.get("api/products");
const orders$ = http.get("api/orders");
const result$ = combineLatest({ products: products$, orders: orders$ });
result$.subscribe(({ products, orders }) => {
console.log(products);
console.log(orders);
});
Câest pratique lorsque vous voulez combiner les rĂ©sultats de plusieurs requĂȘtes HTTP ou lorsque vous souhaitez synchroniser plusieurs flux de donnĂ©es pour faire une opĂ©ration basĂ©e sur plusieurs flux.
merge
Cet opĂ©rateur combine plusieurs Observables en un seul, en Ă©mettant les valeurs de chaque Observable dĂšs quâelles sont disponibles.
import { merge, of } from "rxjs";
import { delay } from "rxjs/operators";
const obs1 = of("Hello");
const obs2 = of("World").pipe(delay(1000));
const merged = merge(obs1, obs2);
merged.subscribe((value) => console.log(value));
// Affiche 'Hello', puis 'World' aprÚs un délai
Utile lorsque vous voulez fusionner plusieurs flux de donnĂ©es en un seul, par exemple lors de lâagrĂ©gation de donnĂ©es Ă partir de plusieurs sources de donnĂ©es dans divers services.
defer
Il permet de retarder la crĂ©ation de lâObservable jusquâĂ ce quâun Observateur sây abonne. Cela signifie que lâObservable nâest crĂ©Ă© quâau moment de lâabonnement, ce qui permet de sâassurer que lâObservable encapsule toujours les donnĂ©es les plus Ă jour.
import { defer } from "rxjs";
const currentTime$ = defer(() => of(new Date().getTime()));
currentTime$.subscribe(console.log);
// đ m'affiche bien le moment actuel
// qu'importe Ă quel moment je subscribe
// si j'avais utilisé simplement of(new Date().getTime())
// alors cela m'aurait retournĂ© le mĂȘme rĂ©sultat Ă chaque subscribe
Câest utile quand vous voulez retarder lâexĂ©cution de la fonction qui gĂ©nĂšre lâObservable jusquâĂ ce quâun abonnĂ© soit prĂ©sent.
Les opérateurs de réaction
map
Cet opĂ©rateur est utilisĂ© pour appliquer une fonction Ă chaque valeur Ă©mise par lâObservable source.
import { of } from "rxjs";
import { map } from "rxjs/operators";
const nums = of(1, 2, 3);
const squareNums = nums.pipe(map((n) => n * n));
squareNums.subscribe((x) => console.log(x)); // Affiche 1, 4, 9
Câest utile lorsque vous voulez transformer les donnĂ©es reçues dâun Observable, par exemple pour reformater les donnĂ©es reçues dâune API.
mergeMap
Il permet de gĂ©rer les situations oĂč on voudrait subscribe dans un subscribe. Câest Ă dire les situations oĂč les valeurs Ă©mient par le flux de donnĂ©es sont elles-mĂȘmes des Observable ou bien quâon veut renvoyer un Observable grĂące Ă la donnĂ©e Ă©mise. mergeMap va sâabonner Ă la valeur et renvoyer le rĂ©sultat de lâabonnement.
import { mergeMap } from 'rxjs/operators';
@Component(...)
export class AppComponent {
http = inject(HttpClient);
productID$ = new FormControl();
product$ = productID$.pipe(
mergeMap(productID => this.http.get(`api/products/${productID}`))
)
}
Il est particuliĂšrement utile pour gĂ©rer les scĂ©narios oĂč chaque valeur Ă©mise par un Observable est elle-mĂȘme un Observable, comme lors de lâenvoi de requĂȘtes HTTP en rĂ©ponse Ă des Ă©vĂ©nements de lâutilisateur.
switchMap
Cet opĂ©rateur est similaire Ă mergeMap(), mais annule les valeurs prĂ©cĂ©dentes chaque fois quâune nouvelle valeur est Ă©mise.
import { switchMap } from 'rxjs/operators';
@Component(...)
export class AppComponent {
http = inject(HttpClient);
productID$ = new FormControl();
// si productID$ Ă©met avant que le http.get se termine
// alors le call HTTP sera cancel et remplacé par le suivant
product$ = productID$.pipe(
switchMap(productID => this.http.get(`api/products/${productID}`))
)
}
Câest pratique lorsque vous voulez ignorer les anciennes valeurs en faveur des nouvelles, par exemple lors de lâimplĂ©mentation dâune fonction de recherche autocomplĂšte oĂč seule la derniĂšre requĂȘte est pertinente.
exhaustMap
Cet opĂ©rateur est aussi similaire Ă mergeMap(), mais il ignore les nouvelles valeurs tant que chaque valeur Ă©mise nâa pas terminĂ©.
import { exhaustMap } from 'rxjs/operators';
@Component(...)
export class AppComponent {
http = inject(HttpClient);
productID$ = new FormControl();
// si productID$ Ă©met avant que le http.get se termine
// alors cette émission sera ignorée
product$ = productID$.pipe(
exhaustMap(productID => this.http.get(`api/products/${productID}`))
)
}
Câest utile dans des situations oĂč vous voulez ignorer les nouvelles valeurs jusquâĂ ce que chaque valeur Ă©mise ait terminĂ© son traitement, comme lors de la gestion des clics sur un bouton pour empĂȘcher les doubles soumissions.
filter
Cet opérateur émet uniquement les valeurs qui satisfont une certaine condition.
import { of } from "rxjs";
import { filter } from "rxjs/operators";
const nums = of(1, 2, 3, 4, 5);
const evens = nums.pipe(filter((n) => n % 2 === 0));
evens.subscribe((x) => console.log(x)); // Affiche 2, 4
Il est utile lorsque vous voulez filtrer certains rĂ©sultats, par exemple en filtrant certaines entrĂ©es utilisateur ou certains rĂ©sultats dâune API.
scan
Cet opĂ©rateur fonctionne comme reduce() pour les tableaux. Il applique une fonction Ă chaque valeur Ă©mise par lâObservable source et Ă©met le rĂ©sultat cumulatif.
import { of } from "rxjs";
import { scan } from "rxjs/operators";
const nums = of(1, 2, 3);
const sum = nums.pipe(scan((acc, curr) => acc + curr, 0));
sum.subscribe((x) => console.log(x)); // Affiche 1, 3, 6
Câest utile lorsque vous voulez accumuler des valeurs au fil du temps, par exemple pour calculer une somme totale ou pour construire un tableau de valeurs Ă©mises.
distinctUntilChanged
Cet opérateur émet une valeur uniquement si elle est différente de la derniÚre valeur émise.
import { of } from "rxjs";
import { distinctUntilChanged } from "rxjs/operators";
const nums = of(1, 1, 2, 2, 3, 3);
const distinctNums = nums.pipe(distinctUntilChanged());
distinctNums.subscribe((x) => console.log(x)); // Affiche 1, 2, 3
Câest utile lorsque vous voulez ignorer les valeurs en double, par exemple lorsquâon crĂ©Ă© un strore fait-maison ou chaque selector utilise distinctUntilChanged() pour Ă©viter dâĂ©mettre Ă nouveau si la valeur est la mĂȘme.
tap
Cet opĂ©rateur est utilisĂ© pour effectuer des effets de bord. Il applique une fonction Ă chaque valeur Ă©mise par lâObservable source, mais retourne la valeur sans la modifier.
import { of } from "rxjs";
import { tap } from "rxjs/operators";
const nums = of(1, 2, 3).pipe(tap(console.log)).subscribe();
Câest utile lorsque vous voulez dĂ©clencher une action secondaire en rĂ©action Ă une valeur Ă©mise, la mise Ă jour dâun Ă©tat (loader par exemple) ou un console.log() pour regarder lâĂ©tat du flux.
delay
Permet de dĂ©caler le moment oĂč les valeurs Ă©mises par un Observable sont rĂ©ellement Ă©mises. Cela retarde essentiellement lâĂ©mission de chaque valeur dâun certain nombre de millisecondes.
import { delay } from "rxjs/operators";
const data$ = of({ some: "data" });
data$.pipe(delay(2000)).subscribe(console.log);
// affiche l'objet passé en paramÚtre mais au bout de 2 secondes
Câest utile lorsque vous voulez retarder une action, comme lâaffichage dâun message de succĂšs aprĂšs une action de lâutilisateur.
take
Cet opĂ©rateur nâĂ©met que les n premiĂšres valeurs puis complĂšte lâObservable.
import { interval } from "rxjs";
import { take } from "rxjs/operators";
const numbers = interval(1000);
const takeFourNumbers = numbers.pipe(take(4));
takeFourNumbers.subscribe(console.oog);
// Affiche la valeur quatre fois puis se complĂšte
Câest utile lorsque vous ne voulez traiter quâun certain nombre de valeurs, comme pour paginer les rĂ©sultats dâune requĂȘte ou limiter le nombre de tentatives dâune action.
takeUntil
Cet opĂ©rateur complĂšte lâObservable dĂšs quâune autre Observable Ă©met une valeur.
import { interval, of } from "rxjs";
import { takeUntil, delay } from "rxjs/operators";
const numbers = interval(1000);
const stopper = of(true).pipe(delay(3500));
const takeUntilStopped = numbers.pipe(takeUntil(stopper));
takeUntilStopped.subscribe(console.log);
// Affiche la valeur jusqu'Ă ce que stopper Ă©mette une valeur
Câest utile lorsque vous voulez arrĂȘter une action dĂšs quâun autre Ă©vĂ©nement se produit, comme arrĂȘter un intervalle lorsquâun utilisateur clique sur un bouton ou unsubscribe dâun flux.
takeWhile
Cet opĂ©rateur Ă©met des valeurs tant quâune condition est vraie et complĂšte lâObservable dĂšs que la condition devient fausse.
import { interval } from "rxjs";
import { takeWhile } from "rxjs/operators";
const numbers = interval(1000);
const takeWhileLessThanFive = numbers.pipe(takeWhile((n) => n < 5));
takeWhileLessThanFive.subscribe(console.log);
// Affiche la valeur tant que n est inférieur à 5
Câest utile lorsque vous voulez continuer Ă Ă©mettre des valeurs en fonction dâune condition, comme arrĂȘter un dĂ©filement automatique quand lâutilisateur atteint le bas de la page.
skip
Cet opĂ©rateur ignore les n premiĂšres valeurs Ă©mises par lâObservable source.
import { interval } from "rxjs";
import { skip } from "rxjs/operators";
const numbers = interval(1000);
const skipTwoNumbers = numbers.pipe(skip(2));
skipTwoNumbers.subscribe(console.log);
// Ignore les deux premiĂšres valeurs
Câest utile lorsque vous voulez ignorer les premiĂšres valeurs, comme lors de la mise en place dâune fonction de pagination.
debounceTime
Cet opĂ©rateur nâĂ©met une valeur que si un certain temps sâest Ă©coulĂ© sans quâune autre valeur soit Ă©mise.
import { debounceTime } from "rxjs/operators";
inputFormControl = new FormControl();
inputFormControl.valueChanges.pipe(debounceTime(300)).subscribe(console.log);
// Ămet une valeur 300ms aprĂšs la derniĂšre saisie de l'utilisateur
Câest utile lorsque vous voulez limiter le taux de traitement, comme pour Ă©viter de faire trop de requĂȘtes pendant la frappe de lâutilisateur dans une recherche en direct.
catchError
Cet opĂ©rateur attrape les erreurs sur lâObservable source et permet de renvoyer un nouvel Observable.
import { catchError } from 'rxjs/operators';
@Component(...)
export class AppComponent {
http = inject(HttpClient);
products$ = this.http.get('api/products').pipe(
catchError(error => {
console.error('Une erreur est survenue lors de la récupération des données :', error);
// On peut afficher un message d'erreur Ă©galement
// On retourne un Observable avec des données par défaut en cas d'erreur
return of({ data: 'Données par défaut' });
})
);
}
Câest utile lorsque vous voulez gĂ©rer les erreurs et peut-ĂȘtre remplacer les valeurs erronĂ©es par des valeurs par dĂ©faut.
retry
Cet opĂ©rateur re-souscrit Ă lâObservable source si une erreur est produite, en permettant un certain nombre de tentatives.
import { retry } from 'rxjs/operators';
@Component(...)
export class AppComponent {
http = inject(HttpClient);
products$ = this.http.get('api/products').pipe(
retry(3) // RĂ©essaye la requĂȘte HTTP 3 fois en cas d'erreur
);
}
Câest utile lorsque vous voulez rĂ©essayer une action en cas dâerreur, comme pour retenter une requĂȘte HTTP qui a Ă©chouĂ©.
startWith
Cet opĂ©rateur fait en sorte que lâObservable source commence par Ă©mettre une ou plusieurs valeurs spĂ©cifiques.
import { of } from "rxjs";
import { startWith } from "rxjs/operators";
const numbers = of(1, 2, 3);
const startWithZero = numbers.pipe(startWith(0));
startWithZero.subscribe((x) => console.log(x)); // Affiche 0, 1, 2, 3
Câest utile lorsque vous voulez fournir des valeurs initiales, comme pour prĂ©-remplir un champ de formulaire ou afficher une valeur par dĂ©faut pendant le chargement des donnĂ©es.