乐闻世界logo
搜索文章和话题

RxJS 中常用的操作符有哪些?如何使用?

2026年2月21日 16:28

常用操作符分类

1. 创建操作符 (Creation Operators)

of

创建一个发出指定值的 Observable

javascript
import { of } from 'rxjs'; of(1, 2, 3).subscribe(console.log); // 输出: 1, 2, 3

from

将数组、Promise、Iterable 等转换为 Observable

javascript
import { from } from 'rxjs'; from([1, 2, 3]).subscribe(console.log); // 输出: 1, 2, 3 from(Promise.resolve('Hello')).subscribe(console.log); // 输出: Hello

interval / timer

创建定时发出的 Observable

javascript
import { interval, timer } from 'rxjs'; interval(1000).subscribe(console.log); // 每秒发出一个递增数字: 0, 1, 2, 3... timer(2000, 1000).subscribe(console.log); // 2秒后开始,每秒发出一个数字: 0, 1, 2, 3...

2. 转换操作符 (Transformation Operators)

map

转换每个发出的值

javascript
import { of } from 'rxjs'; import { map } from 'rxjs/operators'; of(1, 2, 3).pipe( map(x => x * 2) ).subscribe(console.log); // 输出: 2, 4, 6

switchMap

取消之前的内部 Observable,只处理最新的

javascript
import { fromEvent } from 'rxjs'; import { switchMap } from 'rxjs/operators'; fromEvent(document, 'click').pipe( switchMap(() => fetch('/api/data')) ).subscribe(response => console.log(response));

mergeMap

并行处理所有内部 Observable

javascript
import { of } from 'rxjs'; import { mergeMap } from 'rxjs/operators'; of(1, 2, 3).pipe( mergeMap(x => of(x, x * 2)) ).subscribe(console.log); // 输出: 1, 2, 2, 4, 3, 6

concatMap

顺序处理内部 Observable,一个完成后再处理下一个

javascript
import { of } from 'rxjs'; import { concatMap } from 'rxjs/operators'; of(1, 2, 3).pipe( concatMap(x => of(x, x * 2)) ).subscribe(console.log); // 输出: 1, 2, 2, 4, 3, 6 (顺序执行)

3. 过滤操作符 (Filtering Operators)

filter

过滤符合条件的值

javascript
import { of } from 'rxjs'; import { filter } from 'rxjs/operators'; of(1, 2, 3, 4, 5).pipe( filter(x => x % 2 === 0) ).subscribe(console.log); // 输出: 2, 4

take / takeLast / takeUntil

只取前几个、后几个或直到某个条件

javascript
import { interval, fromEvent } from 'rxjs'; import { take, takeUntil } from 'rxjs/operators'; interval(1000).pipe( take(3) ).subscribe(console.log); // 输出: 0, 1, 2 interval(1000).pipe( takeUntil(fromEvent(document, 'click')) ).subscribe(console.log); // 点击时停止

debounceTime

在指定时间内只发出最后一个值

javascript
import { fromEvent } from 'rxjs'; import { debounceTime } from 'rxjs/operators'; fromEvent(inputElement, 'input').pipe( debounceTime(300) ).subscribe(event => console.log(event.target.value));

throttleTime

在指定时间内只发出第一个值

javascript
import { fromEvent } from 'rxjs'; import { throttleTime } from 'rxjs/operators'; fromEvent(window, 'scroll').pipe( throttleTime(200) ).subscribe(event => console.log('Scroll event'));

4. 组合操作符 (Combination Operators)

merge

合并多个 Observable,并行发出值

javascript
import { merge, interval } from 'rxjs'; merge( interval(1000).pipe(map(x => `A${x}`)), interval(1500).pipe(map(x => `B${x}`)) ).subscribe(console.log);

concat

顺序连接多个 Observable

javascript
import { concat, of } from 'rxjs'; concat( of(1, 2), of(3, 4), of(5, 6) ).subscribe(console.log); // 输出: 1, 2, 3, 4, 5, 6

combineLatest

组合多个 Observable 的最新值

javascript
import { combineLatest, of } from 'rxjs'; combineLatest([ of(1, 2, 3), of('a', 'b', 'c') ]).subscribe(([num, char]) => console.log(num, char)); // 输出: [3, 'a'], [3, 'b'], [3, 'c']

zip

按索引组合多个 Observable 的值

javascript
import { zip, of } from 'rxjs'; zip( of(1, 2, 3), of('a', 'b', 'c') ).subscribe(([num, char]) => console.log(num, char)); // 输出: [1, 'a'], [2, 'b'], [3, 'c']

5. 错误处理操作符 (Error Handling Operators)

catchError

捕获错误并返回新的 Observable

javascript
import { of } from 'rxjs'; import { map, catchError } from 'rxjs/operators'; of(1, 2, 3, 4).pipe( map(x => { if (x === 3) throw new Error('Error!'); return x; }), catchError(error => of('default value')) ).subscribe(console.log); // 输出: 1, 2, 'default value'

retry

重试失败的 Observable

javascript
import { of } from 'rxjs'; import { map, retry } from 'rxjs/operators'; let count = 0; of(1, 2, 3).pipe( map(x => { count++; if (count < 3) throw new Error('Error!'); return x; }), retry(2) ).subscribe(console.log);

6. 工具操作符 (Utility Operators)

tap

执行副作用,不修改值

javascript
import { of } from 'rxjs'; import { tap, map } from 'rxjs/operators'; of(1, 2, 3).pipe( tap(x => console.log('Before:', x)), map(x => x * 2), tap(x => console.log('After:', x)) ).subscribe();

delay

延迟发出值

javascript
import { of } from 'rxjs'; import { delay } from 'rxjs/operators'; of(1, 2, 3).pipe( delay(1000) ).subscribe(console.log); // 1秒后输出: 1, 2, 3

实际应用示例

搜索框防抖

javascript
fromEvent(searchInput, 'input').pipe( debounceTime(300), map(event => event.target.value), filter(query => query.length > 2), switchMap(query => searchAPI(query)) ).subscribe(results => displayResults(results));

自动保存

javascript
form.valueChanges.pipe( debounceTime(1000), distinctUntilChanged(), switchMap(formData => saveAPI(formData)) ).subscribe();

并行请求

javascript
merge( getUserData(userId), getUserPosts(userId), getUserComments(userId) ).pipe( combineLatestAll() ).subscribe(([user, posts, comments]) => { renderUserProfile(user, posts, comments); });

最佳实践

  1. 合理使用 pipe() 链式调用
  2. 注意操作符的执行顺序
  3. 及时取消订阅避免内存泄漏
  4. 根据场景选择合适的操作符
  5. 使用 TypeScript 获得更好的类型推断
标签:Rxjs