RxJS 中常用的操作符有哪些?如何使用?
常用操作符分类1. 创建操作符 (Creation Operators)of创建一个发出指定值的 Observableimport { of } from 'rxjs';of(1, 2, 3).subscribe(console.log);// 输出: 1, 2, 3from将数组、Promise、Iterable 等转换为 Observableimport { from } from 'rxjs';from([1, 2, 3]).subscribe(console.log);// 输出: 1, 2, 3from(Promise.resolve('Hello')).subscribe(console.log);// 输出: Hellointerval / timer创建定时发出的 Observableimport { interval, timer } from 'rxjs';interval(1000).subscribe(console.log);// 每秒发出一个递增数字: 0, 1, 2, 3...timer(2000, 1000).subscribe(console.log);// 2秒后开始,每秒发出一个数字: 0, 1, 2, 3...2. 转换操作符 (Transformation Operators)map转换每个发出的值import { of } from 'rxjs';import { map } from 'rxjs/operators';of(1, 2, 3).pipe( map(x => x * 2)).subscribe(console.log);// 输出: 2, 4, 6switchMap取消之前的内部 Observable,只处理最新的import { fromEvent } from 'rxjs';import { switchMap } from 'rxjs/operators';fromEvent(document, 'click').pipe( switchMap(() => fetch('/api/data'))).subscribe(response => console.log(response));mergeMap并行处理所有内部 Observableimport { of } from 'rxjs';import { mergeMap } from 'rxjs/operators';of(1, 2, 3).pipe( mergeMap(x => of(x, x * 2))).subscribe(console.log);// 输出: 1, 2, 2, 4, 3, 6concatMap顺序处理内部 Observable,一个完成后再处理下一个import { of } from 'rxjs';import { concatMap } from 'rxjs/operators';of(1, 2, 3).pipe( concatMap(x => of(x, x * 2))).subscribe(console.log);// 输出: 1, 2, 2, 4, 3, 6 (顺序执行)3. 过滤操作符 (Filtering Operators)filter过滤符合条件的值import { of } from 'rxjs';import { filter } from 'rxjs/operators';of(1, 2, 3, 4, 5).pipe( filter(x => x % 2 === 0)).subscribe(console.log);// 输出: 2, 4take / takeLast / takeUntil只取前几个、后几个或直到某个条件import { interval, fromEvent } from 'rxjs';import { take, takeUntil } from 'rxjs/operators';interval(1000).pipe( take(3)).subscribe(console.log);// 输出: 0, 1, 2interval(1000).pipe( takeUntil(fromEvent(document, 'click'))).subscribe(console.log);// 点击时停止debounceTime在指定时间内只发出最后一个值import { fromEvent } from 'rxjs';import { debounceTime } from 'rxjs/operators';fromEvent(inputElement, 'input').pipe( debounceTime(300)).subscribe(event => console.log(event.target.value));throttleTime在指定时间内只发出第一个值import { fromEvent } from 'rxjs';import { throttleTime } from 'rxjs/operators';fromEvent(window, 'scroll').pipe( throttleTime(200)).subscribe(event => console.log('Scroll event'));4. 组合操作符 (Combination Operators)merge合并多个 Observable,并行发出值import { merge, interval } from 'rxjs';merge( interval(1000).pipe(map(x => `A${x}`)), interval(1500).pipe(map(x => `B${x}`))).subscribe(console.log);concat顺序连接多个 Observableimport { concat, of } from 'rxjs';concat( of(1, 2), of(3, 4), of(5, 6)).subscribe(console.log);// 输出: 1, 2, 3, 4, 5, 6combineLatest组合多个 Observable 的最新值import { combineLatest, of } from 'rxjs';combineLatest([ of(1, 2, 3), of('a', 'b', 'c')]).subscribe(([num, char]) => console.log(num, char));// 输出: [3, 'a'], [3, 'b'], [3, 'c']zip按索引组合多个 Observable 的值import { zip, of } from 'rxjs';zip( of(1, 2, 3), of('a', 'b', 'c')).subscribe(([num, char]) => console.log(num, char));// 输出: [1, 'a'], [2, 'b'], [3, 'c']5. 错误处理操作符 (Error Handling Operators)catchError捕获错误并返回新的 Observableimport { of } from 'rxjs';import { map, catchError } from 'rxjs/operators';of(1, 2, 3, 4).pipe( map(x => { if (x === 3) throw new Error('Error!'); return x; }), catchError(error => of('default value'))).subscribe(console.log);// 输出: 1, 2, 'default value'retry重试失败的 Observableimport { of } from 'rxjs';import { map, retry } from 'rxjs/operators';let count = 0;of(1, 2, 3).pipe( map(x => { count++; if (count < 3) throw new Error('Error!'); return x; }), retry(2)).subscribe(console.log);6. 工具操作符 (Utility Operators)tap执行副作用,不修改值import { of } from 'rxjs';import { tap, map } from 'rxjs/operators';of(1, 2, 3).pipe( tap(x => console.log('Before:', x)), map(x => x * 2), tap(x => console.log('After:', x))).subscribe();delay延迟发出值import { of } from 'rxjs';import { delay } from 'rxjs/operators';of(1, 2, 3).pipe( delay(1000)).subscribe(console.log);// 1秒后输出: 1, 2, 3实际应用示例搜索框防抖fromEvent(searchInput, 'input').pipe( debounceTime(300), map(event => event.target.value), filter(query => query.length > 2), switchMap(query => searchAPI(query))).subscribe(results => displayResults(results));自动保存form.valueChanges.pipe( debounceTime(1000), distinctUntilChanged(), switchMap(formData => saveAPI(formData))).subscribe();并行请求merge( getUserData(userId), getUserPosts(userId), getUserComments(userId)).pipe( combineLatestAll()).subscribe(([user, posts, comments]) => { renderUserProfile(user, posts, comments);});最佳实践合理使用 pipe() 链式调用注意操作符的执行顺序及时取消订阅避免内存泄漏根据场景选择合适的操作符使用 TypeScript 获得更好的类型推断