Higher-Order Observable Mapping w RxJS

Maciej Woźnica | Rozwój oprogramowania | 01.03.2023

Biblioteka RxJs, którą szczegółowo opisałem w poprzednim artykule, oferuje szereg możliwości pracy z danymi, reagowania na zachowania użytkownika czy wszelkie inne zdarzenia zachodzące w aplikacji. W skrócie: RxJs to biblioteka ułatwiająca wykorzystanie konceptu programowania reaktywnego. Jednym z takich ułatwień są operatory wyższego rzędu. Z tego artykułu dowiesz się, czym jest Higher-Order Observable Mapping w RxjS, jakie są rodzaje operatorów i jak mogą ci pomóc w pracy ze strumieniowymi danymi. Zaczynajmy!

Observable oraz strumień danych

Jednymi z najważniejszych elementów biblioteki RxJS są Observable oraz strumienie danych.

Oba te pojęcia są podobne, jednak występują między nimi subtelne różnice:

Strumienie danych  Observable 
Strumienie danych są reprezentowane przez klasę Observable w bibliotece RxJS. Observable może też reprezentować pojedyncze wartości, a nie tylko strumienie. 
Mają węższy zakres i pozwalają na manipulacje na danych i kontrolowanie ich w ograniczony sposób. Ma szerszy zakres zastosowań niż strumienie danych, ponieważ pozwalają na manipulowanie strumieniami i kontrolowanie ich w bardziej zaawansowany sposób.   
Strumienie danych nie pozwalają na multipleksowanie, łączenie, filtrowanie i mapowanie danych. Observable pozwala na wykonanie pewnych operacji, takich jak multipleksowanie, łączenie, filtrowanie i mapowanie, co pozwala na bardziej zaawansowaną pracę z danymi.   
Strumienie danych to sekwencje wartości, które są emitowane przez źródło. Observable to obiekt, który służy do zarządzania strumieniem danych.   
Nie można za ich pomocą kontrolować czasu przepływu wartości czy sterować błędami. Ma dodatkowe cechy pozwalające np. na kontrolowanie czasu przepływu wartości lub sterowanie błędami.   
Strumienie danych są sekwencjami wartości emitowanymi przez źródło. Observable to obiekt, który zarządza strumieniami danych i pozwala na manipulowanie nimi. 

Podsumowując, Observable mają szerszy zakres zastosowań niż strumienie danych i mają dodatkowe funkcje, pozwalające na kontrolowanie czasu przepływu wartości i sterowanie błędami.

Operatory w RxJS

Prawdziwą potęgę biblioteki RxJS stanowią operatory. Operatory pozwalają przekształcać, filtrować, łączyć strumienie danych i zarządzać nimi. RxJS dostarcza szereg operatorów, takich jak: map, filter, reduce, debounceTime, distinctUntilChanged itp., które pozwalają na przetwarzanie danych w strumieniach. Wybór odpowiedniego operatora jest ważny, gdyż źle dobrany operator może wpłynąć na funkcjonowanie systemu. Niżej omawiam najważniejsze z nich.

Higher-Order Observable Mapping

Biblioteka RxJS zawiera szczególny typ operatorów, tzw. higher-order operators, czyli operatory wyższego rzędu, które pozwalają na złożone manipulowanie strumieniami danych. Higher-order operators to operatory, które zawierają w argumentach inne operatory lub Observable i zwracają nowe Observable mające zastosowanie do pierwotnego strumienia danych.

Istnienie higher-order operators w bibliotece RxJS jest uzasadnione z kilku powodów:

  • Zapewniają one prostsze i bardziej czytelne sposoby manipulowania strumieniami danych. Dzięki nim można złożyć wiele operacji na strumieniu w jeden łańcuch, co ułatwia czytanie i zrozumienie kodu.
  • Umożliwiają bardziej elastyczne manipulowanie strumieniami danych. Wiele higher-order operators pozwala na dynamiczne tworzenie nowych strumieni danych w zależności od zdarzeń zachodzących w pierwotnym strumieniu. Dzięki temu można łatwo dostosować przetwarzanie danych do zmieniających się wymagań.
  • Higher-order operators pozwalają na wielokrotne wykorzystanie kodu. Wiele operacji na strumieniach danych jest podobnych i można je zastosować w wielu miejscach w aplikacji. Higher-order operators ułatwiają użycie tych samych operacji w różnych częściach kodu, co pozwala na uniknięcie powtórzeń.

Podsumowując, higher-order operators w bibliotece RxJS pozwalają na bardziej elastyczne, czytelne i wielokrotne wykorzystanie kodu do manipulowania strumieniami danych. Dzięki nim programiści mogą złożyć wiele operacji na strumieniu w jeden łańcuch, co ułatwia zarządzanie danymi w aplikacji.

RxJS udostępnia nam cztery operatory wyższego rzędu:

SwitchMap

Operator switchMap emituje wartości wewnętrznego Observable, ale każda nowa wartość emitowana przez strumień źródłowy powoduje anulowanie poprzedniego wewnętrznego Observable i utworzenie nowego. Wewnętrzny Observable zwrócony z funkcji mapującej jest zastępowany nowym wewnętrznym Observable, co oznacza, że jeśli źródłowy strumień emituje wartości z częstotliwością wyższą niż wewnętrzny Observable, operator switchMap przerywa działanie i zastępuje poprzedni wewnętrzny Observable. SwitchMap możemy zastosować w przypadku, gdy chcemy odwołać wcześniejsze żądania i wysłać tylko najnowsze żądanie, np. dotyczące filtrowania wyników wyszukiwania.

import { fromEvent } from 'rxjs'; 

import { switchMap } from 'rxjs/operators'; 

 

Pobieranie danych z serwera 

function fetchData(keyword: string) { 

  return fetch(`https://api.github.com/search/repositories?q=${keyword}`).then(res => res.json()); 

} 
 

Strumień wejściowy z danymi z pola tekstowego

const input$ = fromEvent(document.querySelector('input'), 'input'); 

Operator switchMap kończy subskrypcję poprzedniego strumienia i zwraca tylko strumień wyjściowy ostatniego wyemitowanego strumienia

const result$ = input$.pipe( 

  switchMap(event => fetchData(event.target.value)) 

); 

Subskrypcja strumienia wyjściowego

result$.subscribe( 

  data => console.log(data), 

  err => console.error(err), 

  () => console.log('Complete') 

); 

W tym przykładzie operator switchMap przekształca każdą wartość strumienia wejściowego (event z pola tekstowego) na strumień wyjściowy, który pobiera dane z serwera GitHub API na podstawie frazy wyszukiwania. Dzięki temu, kiedy użytkownik wprowadza nową frazę, operator switchMap anuluje subskrypcję poprzedniego strumienia wyjściowego i zwraca tylko wynik dla ostatniej frazy.

MergeMap

Operator mergeMap przetwarza wszystkie wartości w strumieniu źródłowym jednocześnie, emitując wartości wewnętrzne Observable. Operator tworzy wiele wewnętrznych Observables, co oznacza, że wynikowy strumień wyjściowy może emitować wartości w dowolnej kolejności. Przykładowym zastosowaniem operatora mergeMap może być równoległe pobieranie danych z wielu źródeł.

Operator mergeMap jest szczególnie przydatny w sytuacjach, gdy potrzebujemy wykonać operację asynchroniczną dla każdego elementu strumienia wejściowego i uzyskać strumień wyjściowy składający się z wyników tych operacji.

import { from, Observable } from 'rxjs'; 

import { mergeMap } from 'rxjs/operators'; 

Definicja funkcji zwracającej strumień asynchroniczny

function makeHttpRequest(id: number): Observable {

return fetch(https://jsonplaceholder.typicode.com/posts/${id}).then(res => res.json());

}

Strumień wejściowy z listą identyfikatorów

const ids$ = from([1, 2, 3, 4, 5]); 

 

Operator mergeMap przekształca każdy element strumienia wejściowego na strumień wyjściowy

const posts$ = ids$.pipe( 

  mergeMap(id => makeHttpRequest(id)) 

); 

Subskrypcja strumienia wyjściowego

posts$.subscribe( 

  post => console.log(post), 

  err => console.error(err), 

  () => console.log('Complete') 

); 

W tym przykładzie operator mergeMap przekształca każdy element strumienia wejściowego (identyfikator posta) na strumień wyjściowy reprezentujący wynik asynchronicznej operacji pobierania danych z serwera dla danego posta. W efekcie otrzymujemy strumień wyjściowy składający się z pobranych postów, a nie z identyfikatorów.

ContactMap

Operator concatMap emituje wartości wewnętrznego Observable w kolejności, w jakiej są one dostarczane. Operator nie tworzy wielu wewnętrznych Observables, dopóki poprzedni Observable nie zakończy działania. W przypadku, gdy w strumieniu źródłowym pojawi się kolejna wartość, zanim zakończy działanie poprzedni wewnętrzny Observable. Operator concatMap umieszcza wartości kolejnego wewnętrznego Observable w kolejce i przetwarza je dopiero po zakończeniu poprzedniego Observable.

import { of } from 'rxjs'; 

import { concatMap, delay } from 'rxjs/operators'; 

Strumień wejściowy z liczbami

const source$ = of(1, 2, 3, 4, 5); 

Operator concatMap przekształca każdy element strumienia wejściowego na nowy strumień wyjściowy z opóźnieniem w zależności od wartości elementu

const example$ = source$.pipe( 

  concatMap(value => of(`Delayed by: ${value * 1000}ms`).pipe(delay(value * 1000))) 

); 

Subskrypcja strumienia wyjściowego

example$.subscribe(console.log); 

W powyższym przykładzie concatMap przekształca każdą wartość strumienia wejściowego (1, 2, 3, 4, 5) na strumień wyjściowy, który zostaje opóźniony o wartość elementu w sekundach i zwrócony w kolejności ich pojawiania się w źródłowym strumieniu. Dzięki temu pierwszy element (1) zostanie zwrócony po jednej sekundzie, drugi element (2) po dwóch sekundach itd.

ExhaustMap

Operator exhaustMap przekształca każdy element strumienia wejściowego (źródłowego) na nowy strumień i ignoruje wszystkie kolejne elementy, zanim zakończy działanie strumień wyjściowy.

ExhaustMap jest przydatny, gdy jest nam potrzebne, żeby w danym czasie był emitowany tylko jeden strumień wyjściowy. W przypadku, gdy pojawiają się nowe elementy w strumieniu wejściowym przed zakończeniem strumienia wyjściowego, są one ignorowane aż do zakończenia działania poprzedniego strumienia wyjściowego.

Przykładowym zastosowaniem operatora exhaustMap może być ograniczenie liczby żądań sieciowych w sytuacji, gdy w aplikacji użytkownik kliknie wiele razy na przycisk, ale zwrócone zostanie tylko jedno zapytanie.

 

import { interval } from 'rxjs'; 

import { exhaustMap, take } from 'rxjs/operators'; 

Strumień wejściowy z wartościami

const source$ = interval(1000).pipe(take(4)); 

Operator exhaustMap przekształca każdą wartość strumienia wejściowego na nowy strumień wyjściowy, który zostaje zwrócony dopiero po zakończeniu działania jego poprzednika. 

const example$ = source$.pipe( 

  exhaustMap(value => interval(500).pipe(take(3))) 

); 

Subskrypcja strumienia wyjściowego 

example$.subscribe(console.log); 

Powyższy przykład obrazuje sytuacje, gdy operator exhaustMap przekształca każdą wartość strumienia wejściowego (0, 1, 2, 3) na strumień wyjściowy emitujący wartości z opóźnieniem 0,5 sekundy. Każdy kolejny strumień wyjściowy zostanie zignorowany, dopóki poprzedni nie zostanie zakończony, dlatego drugi strumień wyjściowy z wartościami (0, 1, 2) nie zostanie wyemitowany, ponieważ pierwszy strumień z wartościami (0, 1, 2) nadal jest w trakcie emisji.

Podsumowując:

  • exhaustMap ignoruje nowe zdarzenia, jeśli obecny strumień wciąż trwa. Jeśli w trakcie trwania strumienia wejściowego operator otrzyma kolejne zdarzenie, to zostanie ono pominięte, aż do momentu, gdy strumień wejściowy się zakończy.
  • mergeMap przetwarza wszystkie wartości w strumieniu źródłowym jednocześnie, emitując wartości wewnętrznych Observables.
  • concatMap emituje wartości wewnętrznego Observable w kolejności, w jakiej są one emitowane.
  • switchMap emituje wartości wewnętrznego Observable, ale zastępuje poprzedni wewnętrzny Observable nowym, gdy źródłowy strumień emituje nowe wartości.

FAQ:  

Czym jest RxJS?

RxJS to biblioteka programistyczna napisana w języku JavaScript, która implementuje wzorzec programowania reaktywnego, oparty na strumieniach danych. RxJS umożliwia programowanie reaktywne w języku JavaScript.

Biblioteka RxJS zapewnia programistom szereg narzędzi i operacji do tworzenia, przetwarzania i obsługi strumieni danych. Umożliwia tworzenie i łączenie strumieni z różnych źródeł, np. interakcji użytkownika czy zapytań HTTP. RxJS dostarcza także wiele operatorów, które umożliwiają transformowanie, filtrowanie i łączenie strumieni danych.

RxJS ma wiele zastosowań, takich jak obsługa zdarzeń interakcji użytkownika w aplikacjach internetowych, tworzenie strumieni danych z serwera lub integracja z frameworkami frontendowymi, takimi jak Angular czy React.

Czym jest strumień danych w programowaniu reaktywnym?

W programowaniu reaktywnym strumień danych to sekwencja zdarzeń, która może mieć miejsce w czasie i być przetwarzana asynchronicznie. Może to być na przykład strumień zdarzeń z interfejsu użytkownika, strumień danych z sieci, pliku, bazy danych lub innego źródła. Strumienie danych są kluczowym elementem w programowaniu reaktywnym, ponieważ pozwalają na przetwarzanie zdarzeń i reagowanie na nie w czasie rzeczywistym, w miarę ich pojawiania się. Strumienie mogą mieć zero lub wiele wartości. W bibliotece RxJS operatorami są funkcje, które pozwalają na pracę z asynchronicznymi strumieniami danych. Dzięki wykorzystaniu operatorów i strumieni danych programowanie reaktywne umożliwia tworzenie skalowalnych, elastycznych i wydajnych aplikacji. Szerzej zostało to opisane w artykule „Programowanie reaktywne w JS z RxJS”.

Jak efektywniej wykorzystać bibliotekę RxJS?

Używaj Observables zamiast Promises – Observables są bardziej elastyczne niż Promises i pozwalają na łatwe operowanie na strumieniu danych.

– Unikaj nadmiernego korzystania z operatorów – operatorów w RxJS jest wiele i bardzo łatwo jest przesadzić z ich wykorzystaniem. Zawsze należy zastanowić się, czy dany operator jest potrzebny i czy faktycznie przyczynia się do poprawy czytelności i wydajności kodu.

– Unikaj jednoczesnego korzystania z wielu strumieni – w RxJS bardzo łatwo jest stworzyć wiele strumieni danych, co może prowadzić do problemów z wydajnością aplikacji. Zawsze należy zastanowić się, czy rzeczywiście potrzebujemy tak wielu strumieni i czy nie da się ich połączyć w jeden.

– Zwracaj uwagę na pamięć – w RxJS strumienie danych są połączone z pamięcią, więc warto zwracać uwagę na to, czy nie tworzymy niepotrzebnie dużych strumieni, które mogą powodować problemy z wydajnością.

– Korzystaj z operatorów wysokiego rzędu – operator wysokiego rzędu w RxJS pozwala na tworzenie bardziej skomplikowanych strumieni danych. Ich umiejętne wykorzystanie może znacznie ułatwić pracę i poprawić czytelność kodu.

– Testuj kod – RxJS jest bardzo potężną biblioteką, ale wymaga ona również więcej uwagi podczas testowania. Zawsze warto pamiętać o testowaniu swojego kodu.

Powyższe porady są ogólne i zależą od konkretnych wymagań projektu. W każdym przypadku warto jednak pamiętać o zachowaniu czytelności i przejrzystości kodu, a także o testowaniu swojego rozwiązania.

Czym jest typ obiektu? Dlaczego musisz go zasubskrybować, aby otrzymać dane?

Typ obiektu w RxJS to abstrakcyjny typ danych, który reprezentuje strumień wartości emitowanych przez źródło. Typ ten definiuje interfejs do tworzenia, przetwarzania i subskrybowania strumieni danych w programowaniu reaktywnym.

Ważną cechą strumienia danych w RxJS jest to, że nie emituje on danych do subskrybenta automatycznie. W przeciwieństwie do klasycznych zapytań HTTP, które zwracają wynik tylko raz, strumienie danych mogą emitować wartości wielokrotnie w czasie. Aby otrzymać wartości emitowane przez strumień danych, musisz zasubskrybować go, czyli utworzyć subskrypcję. Subskrypcja reprezentuje połączenie pomiędzy subskrybentem a źródłem danych, które umożliwia odbieranie wartości emitowanych przez strumień.

Subskrypcja umożliwia również zarządzanie cyklem życia strumienia danych. Możesz anulować subskrypcję, jeśli nie potrzebujesz już otrzymywać wartości z danego strumienia. Możesz również wykonywać różne operacje na strumieniu, takie jak transformacje, filtrowanie lub łączenie z innymi strumieniami, poprzez użycie operatorów RxJS.

Podsumowując, subskrypcja jest niezbędna, aby otrzymać wartości emitowane przez strumień danych w RxJS. Dzięki temu, że subskrypcje umożliwiają zarządzanie cyklem życia strumienia, można w łatwy sposób manipulować przepływem danych w programowaniu reaktywnym.

2021.09.15 JPro cover 2 1 - Higher-Order Observable Mapping w RxJS

Styled components oraz useContext

Użycie hooków w nowoczesnych aplikacjach frontendowych

Przeczytaj artykuł