乐闻世界logo
搜索文章和话题

Rxjs

一个 JavaScript 库,使用可观察量进行反应式编程,处理异步数据调用、回调和基于事件的程序。
Rxjs
查看更多相关内容
RxJS 中 Hot Observable 和 Cold Observable 有什么区别?## Hot Observable vs Cold Observable ### Cold Observable(冷 Observable) **定义**: Cold Observable 是惰性的,每个订阅者都会独立执行 Observable 的逻辑。 **特点**: - 每个订阅者都会获得独立的数据流 - 订阅时才开始执行 - 不共享数据 - 生产者不会主动推送数据 **示例**: ```javascript import { Observable } from 'rxjs'; const cold$ = new Observable(subscriber => { console.log('Observable executed'); subscriber.next(Math.random()); subscriber.complete(); }); cold$.subscribe(value => console.log('Subscriber 1:', value)); // Observable executed // Subscriber 1: 0.123456 cold$.subscribe(value => console.log('Subscriber 2:', value)); // Observable executed // Subscriber 2: 0.789012 // 注意:每次订阅都重新执行,产生不同的随机数 ``` **常见的 Cold Observable**: - `of()` - `from()` - `interval()` - `timer()` - `ajax()` - `http.get()` (Angular) - 大多数创建操作符 ### Hot Observable(热 Observable) **定义**: Hot Observable 是主动的,多个订阅者共享同一个数据流。 **特点**: - 所有订阅者共享同一个数据流 - 即使没有订阅者也会执行 - 共享数据 - 生产者主动推送数据 **示例**: ```javascript import { Observable, Subject } from 'rxjs'; const hot$ = new Observable(subscriber => { console.log('Observable executed'); subscriber.next(Math.random()); subscriber.complete(); }); const subject = new Subject(); hot$.subscribe(subject); subject.subscribe(value => console.log('Subscriber 1:', value)); // Observable executed // Subscriber 1: 0.123456 subject.subscribe(value => console.log('Subscriber 2:', value)); // Subscriber 2: 0.123456 // 注意:两个订阅者收到相同的值 ``` **常见的 Hot Observable**: - `Subject` 及其变体 - `BehaviorSubject` - `ReplaySubject` - `AsyncSubject` - DOM 事件(通过 `fromEvent`) - WebSocket 连接 - `share()` 转换后的 Observable ## 转换方法 ### 1. 使用 share() 将 Cold 转换为 Hot ```javascript import { interval } from 'rxjs'; import { share, take } from 'rxjs/operators'; const cold$ = interval(1000).pipe( take(5) ); const hot$ = cold$.pipe( share() // 转换为 Hot Observable ); hot$.subscribe(value => console.log('Subscriber 1:', value)); hot$.subscribe(value => console.log('Subscriber 2:', value)); // 两个订阅者共享同一个数据流 ``` ### 2. 使用 shareReplay() 缓存值 ```javascript import { interval } from 'rxjs'; import { shareReplay, take } from 'rxjs/operators'; const hot$ = interval(1000).pipe( take(5), shareReplay(1) // 缓存最后一个值 ); hot$.subscribe(value => console.log('Subscriber 1:', value)); setTimeout(() => { hot$.subscribe(value => console.log('Subscriber 2:', value)); // 新订阅者会立即收到缓存的值 }, 3000); ``` ### 3. 使用 publish() 和 connect() ```javascript import { interval } from 'rxjs'; import { publish, take } from 'rxjs/operators'; const cold$ = interval(1000).pipe( take(5) ); const hot$ = cold$.pipe( publish() // 转换为 Hot Observable ); hot$.subscribe(value => console.log('Subscriber 1:', value)); hot$.subscribe(value => console.log('Subscriber 2:', value)); hot$.connect(); // 开始执行 ``` ## 实际应用场景 ### Cold Observable 适用场景 1. **HTTP 请求** ```javascript // 每次订阅都会发起新的请求 http.get('/api/data').subscribe(data => { console.log('Request 1:', data); }); http.get('/api/data').subscribe(data => { console.log('Request 2:', data); }); ``` 2. **独立的数据处理** ```javascript // 每个订阅者需要独立的数据流 of(1, 2, 3).pipe( map(x => x * 2) ).subscribe(value => console.log(value)); ``` 3. **需要重新执行的场景** ```javascript // 每次订阅都重新计算 const calculation$ = new Observable(subscriber => { const result = expensiveCalculation(); subscriber.next(result); subscriber.complete(); }); ``` ### Hot Observable 适用场景 1. **共享数据** ```javascript // 多个组件共享同一个数据流 const userData$ = http.get('/api/user').pipe( share() ); component1.userData$.subscribe(user => { console.log('Component 1:', user); }); component2.userData$.subscribe(user => { console.log('Component 2:', user); }); // 只发起一次请求,两个组件共享结果 ``` 2. **事件流** ```javascript // 多个订阅者监听同一个事件 const click$ = fromEvent(document, 'click').pipe( share() ); click$.subscribe(event => { console.log('Handler 1:', event); }); click$.subscribe(event => { console.log('Handler 2:', event); }); ``` 3. **WebSocket 连接** ```javascript // 多个订阅者共享同一个 WebSocket 连接 const socket$ = webSocket('ws://localhost:8080').pipe( share() ); socket$.subscribe(message => { console.log('Handler 1:', message); }); socket$.subscribe(message => { console.log('Handler 2:', message); }); ``` ## 性能对比 ### Cold Observable 性能特点 **优点**: - 每个订阅者获得独立的数据流 - 不会相互影响 - 适合需要独立处理的场景 **缺点**: - 可能重复执行相同的操作 - 浪费资源(如重复的 HTTP 请求) - 内存占用可能更高 ### Hot Observable 性能特点 **优点**: - 共享数据流,避免重复执行 - 节省资源(如只发起一次 HTTP 请求) - 内存占用更低 **缺点**: - 订阅者可能错过之前的数据 - 需要管理订阅时机 - 可能出现竞态条件 ## 选择指南 ### 使用 Cold Observable 当: - 每个订阅者需要独立的数据流 - 需要重新执行操作 - 订阅者之间不应该相互影响 - 数据源是按需生成的 ### 使用 Hot Observable 当: - 多个订阅者需要共享数据 - 需要避免重复执行(如 HTTP 请求) - 数据源是主动推送的(如事件、WebSocket) - 需要缓存数据供后续订阅者使用 ## 最佳实践 ### 1. HTTP 请求共享 ```javascript // ❌ 错误:每次订阅都发起请求 class UserService { getUser(id: string) { return http.get(`/api/users/${id}`); } } // ✅ 正确:共享请求结果 class UserService { private cache = new Map<string, Observable<User>>(); getUser(id: string) { if (!this.cache.has(id)) { this.cache.set(id, http.get(`/api/users/${id}`).pipe( shareReplay(1) )); } return this.cache.get(id)!; } } ``` ### 2. 事件处理 ```javascript // 使用 share() 共享事件流 const resize$ = fromEvent(window, 'resize').pipe( debounceTime(200), share() ); resize$.subscribe(event => { updateLayout1(event); }); resize$.subscribe(event => { updateLayout2(event); }); ``` ### 3. 状态管理 ```javascript // 使用 BehaviorSubject 管理状态 const state$ = new BehaviorSubject(initialState); state$.subscribe(state => { console.log('Listener 1:', state); }); state$.subscribe(state => { console.log('Listener 2:', state); }); // 更新状态 state$.next(newState); ``` ## 常见陷阱 ### 1. 忘记共享导致重复请求 ```javascript // ❌ 错误示例 const data$ = http.get('/api/data'); data$.subscribe(data => console.log('Component 1:', data)); data$.subscribe(data => console.log('Component 2:', data)); // 发起两次请求 // ✅ 正确示例 const data$ = http.get('/api/data').pipe( share() ); data$.subscribe(data => console.log('Component 1:', data)); data$.subscribe(data => console.log('Component 2:', data)); // 只发起一次请求 ``` ### 2. 错误的共享时机 ```javascript // ❌ 错误示例 const data$ = http.get('/api/data').pipe( share() ); // 立即订阅触发请求 data$.subscribe(); // 后续订阅者可能错过数据 setTimeout(() => { data$.subscribe(data => console.log(data)); }, 2000); // ✅ 正确示例 const data$ = http.get('/api/data').pipe( shareReplay(1) // 缓存数据 ); ``` ### 3. 不当使用 shareReplay ```javascript // ❌ 错误示例:缓存过多数据 const data$ = interval(1000).pipe( shareReplay(1000) // 缓存1000个值,占用大量内存 ); // ✅ 正确示例:合理设置缓存大小 const data$ = interval(1000).pipe( shareReplay(1) // 只缓存最后一个值 ); ``` ## 总结 理解 Hot 和 Cold Observable 的区别对于编写高效的 RxJS 代码至关重要: 1. **Cold Observable**: 惰性、独立执行、适合按需生成的数据 2. **Hot Observable**: 主动、共享执行、适合主动推送的数据 3. **转换方法**: 使用 `share()`、`shareReplay()` 等操作符进行转换 4. **性能考虑**: Hot Observable 可以避免重复执行,提高性能 5. **选择原则**: 根据场景选择合适的类型,避免不必要的资源浪费 正确使用这两种 Observable 类型,可以显著提升应用的性能和可维护性。
前端 · 2026年2月21日 16:59
RxJS 中的 Marble Testing 是什么?如何使用?## Marble Testing 的概念 Marble Testing 是 RxJS 中一种基于字符串的可视化测试方法,它使用特殊的语法来表示 Observable 的时间流和事件。这种方法让异步测试变得直观和易于理解。 ## Marble 语法 ### 基本符号 | 符号 | 含义 | |------|------| | `-` | 时间流逝(1帧,约10ms)| | `a`, `b`, `c` | 发出的值 | | `|` | 完成 | | `#` | 错误 | | `()` | 同步发出 | | `^` | 订阅点(hot Observable)| | `!` | 取消订阅 | ### 示例 ```javascript // 基本示例 const source$ = cold('-a-b-c-|'); // 含义:10ms后发出a,20ms后发出b,30ms后发出c,40ms后完成 // 错误示例 const error$ = cold('-a-b-#'); // 含义:10ms后发出a,20ms后发出b,30ms后出错 // 同步示例 const sync$ = cold('(abc|)'); // 含义:同步发出a、b、c,然后完成 // Hot Observable const hot$ = hot('^-a-b-c-|'); // 含义:从订阅点开始,10ms后发出a,20ms后发出b,30ms后发出c,40ms后完成 ``` ## TestScheduler 的使用 ### 基本设置 ```javascript import { TestScheduler } from 'rxjs/testing'; describe('My Observable Tests', () => { let testScheduler: TestScheduler; beforeEach(() => { testScheduler = new TestScheduler((actual, expected) => { expect(actual).toEqual(expected); }); }); }); ``` ### 测试基本操作符 ```javascript import { of } from 'rxjs'; import { map, filter } from 'rxjs/operators'; it('should map values', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-|'); const expected = '-A-B-C-|'; const result$ = source$.pipe( map(x => x.toUpperCase()) ); expectObservable(result$).toBe(expected, { a: 'a', b: 'b', c: 'c' }); }); }); it('should filter values', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-d-|'); const expected = '-a-c---|'; const result$ = source$.pipe( filter(x => ['a', 'c'].includes(x)) ); expectObservable(result$).toBe(expected); }); }); ``` ### 测试时间相关操作符 ```javascript import { of } from 'rxjs'; import { delay, debounceTime, throttleTime } from 'rxjs/operators'; it('should delay emissions', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-|'); const expected = '---a-b-c-|'; // 延迟30ms const result$ = source$.pipe( delay(30, testScheduler) ); expectObservable(result$).toBe(expected); }); }); it('should debounce', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a--b---c-|'); const expected = '-----b---c-|'; const result$ = source$.pipe( debounceTime(20, testScheduler) ); expectObservable(result$).toBe(expected); }); }); it('should throttle', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-d-|'); const expected = '-a---c---|'; const result$ = source$.pipe( throttleTime(30, testScheduler) ); expectObservable(result$).toBe(expected); }); }); ``` ### 测试组合操作符 ```javascript import { of, merge, concat, combineLatest } from 'rxjs'; it('should merge observables', () => { testScheduler.run(({ cold, expectObservable }) => { const source1$ = cold('-a---b-|'); const source2$ = cold('--c-d---|'); const expected = '-a-c-b-d-|'; const result$ = merge(source1$, source2$); expectObservable(result$).toBe(expected); }); }); it('should concatenate observables', () => { testScheduler.run(({ cold, expectObservable }) => { const source1$ = cold('-a-b-|'); const source2$ = cold('--c-d-|'); const expected = '-a-b--c-d-|'; const result$ = concat(source1$, source2$); expectObservable(result$).toBe(expected); }); }); it('should combine latest', () => { testScheduler.run(({ cold, expectObservable }) => { const source1$ = cold('-a---b-|'); const source2$ = cold('--c-d---|'); const expected = '----ab-bd-|'; const result$ = combineLatest([source1$, source2$]); expectObservable(result$).toBe(expected); }); }); ``` ### 测试错误处理 ```javascript import { of, throwError } from 'rxjs'; import { catchError, retry } from 'rxjs/operators'; it('should catch errors', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-#'); const expected = '-a-b-(d|)'; const result$ = source$.pipe( catchError(() => of('d')) ); expectObservable(result$).toBe(expected); }); }); it('should retry on error', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-#'); const expected = '-a-a-#'; const result$ = source$.pipe( retry(1) ); expectObservable(result$).toBe(expected); }); }); ``` ### 测试订阅和取消订阅 ```javascript import { interval } from 'rxjs'; import { take } from 'rxjs/operators'; it('should handle subscription', () => { testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => { const source$ = cold('-a-b-c-|'); const subs = '^------!'; const result$ = source$.pipe(take(2)); expectObservable(result$).toBe('-a-b-|'); expectSubscriptions(source$.subscriptions).toBe(subs); }); }); it('should handle unsubscription', () => { testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => { const source$ = cold('-a-b-c-d-|'); const subs = '^---!'; const result$ = source$.pipe(take(2)); expectObservable(result$).toBe('-a-b-|'); expectSubscriptions(source$.subscriptions).toBe(subs); }); }); ``` ## 实际应用示例 ### 1. 测试搜索功能 ```javascript import { of } from 'rxjs'; import { debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators'; function search(query: string) { return of(`Results for ${query}`); } it('should search with debounce', () => { testScheduler.run(({ cold, expectObservable }) => { const input$ = cold('-a--b---c-|'); const expected = '-----b---c-|'; const result$ = input$.pipe( debounceTime(20, testScheduler), distinctUntilChanged(), switchMap(query => search(query)) ); expectObservable(result$).toBe(expected); }); }); ``` ### 2. 测试自动保存 ```javascript import { of } from 'rxjs'; import { debounceTime, switchMap } from 'rxjs/operators'; function save(data: any) { return of('Saved'); } it('should auto-save with debounce', () => { testScheduler.run(({ cold, expectObservable }) => { const changes$ = cold('-a--b---c-|'); const expected = '-----b---c-|'; const result$ = changes$.pipe( debounceTime(20, testScheduler), switchMap(data => save(data)) ); expectObservable(result$).toBe(expected); }); }); ``` ### 3. 测试轮询功能 ```javascript import { interval } from 'rxjs'; import { take, map } from 'rxjs/operators'; it('should poll at intervals', () => { testScheduler.run(({ cold, expectObservable }) => { const expected = '-a-b-c-d-e-|'; const result$ = interval(10, testScheduler).pipe( take(5), map(x => String.fromCharCode(97 + x)) ); expectObservable(result$).toBe(expected); }); }); ``` ### 4. 测试缓存功能 ```javascript import { of } from 'rxjs'; import { shareReplay } from 'rxjs/operators'; it('should cache values', () => { testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => { const source$ = cold('-a-b-c-|'); const expected = '-a-b-c-|'; const subs = ['^------!', ' ^-!']; const cached$ = source$.pipe(shareReplay(1)); expectObservable(cached$).toBe(expected); expectObservable(cached$).toBe('--c-|'); expectSubscriptions(source$.subscriptions).toBe(subs); }); }); ``` ## 高级用法 ### 1. 测试 Hot Observable ```javascript it('should handle hot observable', () => { testScheduler.run(({ hot, expectObservable }) => { const source$ = hot('--a--b--c--|'); const sub = '---^--------!'; const expected = '--b--c--|'; const result$ = source$.pipe(take(2)); expectObservable(result$, sub).toBe(expected); }); }); ``` ### 2. 测试多播 ```javascript import { of } from 'rxjs'; import { share, multicast } from 'rxjs/operators'; import { Subject } from 'rxjs'; it('should multicast correctly', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-|'); const expected = '-a-b-c-|'; const shared$ = source$.pipe(share()); expectObservable(shared$).toBe(expected); expectObservable(shared$).toBe(expected); }); }); ``` ### 3. 测试自定义操作符 ```javascript import { Observable } from 'rxjs'; import { OperatorFunction } from 'rxjs'; function customMap<T, R>(project: (value: T) => R): OperatorFunction<T, R> { return (source$) => new Observable(subscriber => { return source$.subscribe({ next: value => { try { subscriber.next(project(value)); } catch (error) { subscriber.error(error); } }, error: error => subscriber.error(error), complete: () => subscriber.complete() }); }); } it('should use custom operator', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-|'); const expected = '-A-B-C-|'; const result$ = source$.pipe( customMap(x => x.toUpperCase()) ); expectObservable(result$).toBe(expected); }); }); ``` ## 最佳实践 ### 1. 使用有意义的值 ```javascript // ✅ 好的做法 it('should map values', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-|'); const expected = '-A-B-C-|'; const result$ = source$.pipe(map(x => x.toUpperCase())); expectObservable(result$).toBe(expected, { a: 'a', b: 'b', c: 'c', A: 'A', B: 'B', C: 'C' }); }); }); // ❌ 不好的做法 it('should map values', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-|'); const expected = '-d-e-f-|'; const result$ = source$.pipe(map(x => x.toUpperCase())); expectObservable(result$).toBe(expected); }); }); ``` ### 2. 测试边界情况 ```javascript it('should handle empty observable', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('|'); const expected = '|'; const result$ = source$.pipe(map(x => x.toUpperCase())); expectObservable(result$).toBe(expected); }); }); it('should handle error observable', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-#'); const expected = '-#'; const result$ = source$.pipe(map(x => x.toUpperCase())); expectObservable(result$).toBe(expected); }); }); ``` ### 3. 使用 expectSubscriptions ```javascript it('should subscribe and unsubscribe correctly', () => { testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => { const source$ = cold('-a-b-c-|'); const subs = '^------!'; const result$ = source$.pipe(take(3)); expectObservable(result$).toBe('-a-b-c-|'); expectSubscriptions(source$.subscriptions).toBe(subs); }); }); ``` ## 总结 Marble Testing 是 RxJS 中强大的测试工具,它提供了: 1. **可视化测试**: 使用字符串表示时间流,直观易懂 2. **时间控制**: 精确控制异步操作的时序 3. **易于维护**: 清晰的语法和结构 4. **全面覆盖**: 可以测试各种操作符和场景 掌握 Marble Testing 可以显著提升 RxJS 代码的测试质量和开发效率。
服务端 · 2026年2月21日 16:58
RxJS 中 switchMap、mergeMap、concatMap 有什么区别?## switchMap、mergeMap、concatMap 的核心区别 这三个操作符都是用于处理高阶 Observable(Observable of Observable)的,但它们的处理策略完全不同: ### 1. switchMap **特点**: 取消之前的内部 Observable,只处理最新的 **工作原理**: - 当新的值到达时,取消之前未完成的 Observable - 只保留最新的 Observable 的结果 - 适合需要取消旧请求的场景 **示例代码**: ```javascript import { of, interval } from 'rxjs'; import { switchMap, take, delay } from 'rxjs/operators'; // 模拟异步请求 function makeRequest(id) { return of(`Result ${id}`).pipe(delay(1000)); } interval(500).pipe( switchMap(id => makeRequest(id)), take(5) ).subscribe(console.log); // 输出: Result 4 // 解释: 因为每500ms触发一次,但请求需要1000ms // 所以每次新请求来时,之前的请求都被取消了 // 最终只有最后一个请求完成了 ``` **实际应用场景**: - 搜索框输入:每次输入都取消上一次的搜索请求 - 自动完成:只显示最新输入的结果 - 导航切换:取消未完成的页面加载 ```javascript // 搜索框示例 searchInput.pipe( switchMap(query => searchAPI(query)) ).subscribe(results => { // 只显示最新搜索的结果 displayResults(results); }); ``` ### 2. mergeMap **特点**: 并行处理所有内部 Observable **工作原理**: - 同时订阅所有内部 Observable - 所有 Observable 的结果都会被发出 - 不保证顺序,结果可能交错 **示例代码**: ```javascript import { of } from 'rxjs'; import { mergeMap, delay } from 'rxjs/operators'; function makeRequest(id) { return of(`Result ${id}`).pipe(delay(id * 200)); } of(1, 2, 3).pipe( mergeMap(id => makeRequest(id)) ).subscribe(console.log); // 输出: Result 1, Result 2, Result 3 // 解释: 所有请求并行执行,按完成顺序输出 ``` **实际应用场景**: - 并行加载多个资源 - 批量处理独立任务 - 不需要顺序的并发请求 ```javascript // 并行加载用户数据示例 merge( getUserProfile(userId), getUserPosts(userId), getUserComments(userId) ).pipe( mergeMap(response => response.json()) ).subscribe(data => { // 所有数据并行加载完成 updateUI(data); }); ``` ### 3. concatMap **特点**: 顺序处理内部 Observable,一个完成后再处理下一个 **工作原理**: - 按顺序订阅内部 Observable - 当前 Observable 完成后才订阅下一个 - 保证结果的顺序 **示例代码**: ```javascript import { of } from 'rxjs'; import { concatMap, delay } from 'rxjs/operators'; function makeRequest(id) { return of(`Result ${id}`).pipe(delay(500)); } of(1, 2, 3).pipe( concatMap(id => makeRequest(id)) ).subscribe(console.log); // 输出: Result 1, Result 2, Result 3 // 解释: 每个请求按顺序执行,前一个完成后才执行下一个 ``` **实际应用场景**: - 需要保证顺序的请求 - 依赖前一个结果的后续请求 - 防止服务器过载 ```javascript // 顺序上传文件示例 files.pipe( concatMap(file => uploadFile(file)) ).subscribe(result => { // 文件按顺序上传 console.log('Uploaded:', result); }); ``` ## 对比总结 | 特性 | switchMap | mergeMap | concatMap | |------|-----------|----------|-----------| | 执行方式 | 取消旧的,只保留最新的 | 并行执行所有 | 顺序执行 | | 结果顺序 | 只保留最新结果 | 不保证顺序 | 保证顺序 | | 并发数 | 1(同时只有1个) | 无限制 | 1(同时只有1个) | | 适用场景 | 搜索、自动完成 | 并行加载 | 顺序处理 | | 性能 | 最快(取消旧请求) | 最快(并行) | 较慢(顺序) | | 内存占用 | 低 | 高 | 低 | ## 选择指南 ### 使用 switchMap 当: - 需要取消旧请求 - 只关心最新结果 - 搜索、自动完成等场景 - 避免不必要的网络请求 ### 使用 mergeMap 当: - 需要并行处理 - 请求之间没有依赖 - 需要最大化性能 - 不关心结果顺序 ### 使用 concatMap 当: - 需要保证顺序 - 请求之间有依赖 - 需要限制并发数 - 避免服务器过载 ## 性能考虑 ```javascript // 性能对比示例 const source = interval(100).pipe(take(10)); // switchMap: 只处理最后一个 source.pipe( switchMap(x => of(x).pipe(delay(1000))) ).subscribe(); // mergeMap: 并行处理所有(可能造成性能问题) source.pipe( mergeMap(x => of(x).pipe(delay(1000))) ).subscribe(); // concatMap: 顺序处理(可能较慢) source.pipe( concatMap(x => of(x).pipe(delay(1000))) ).subscribe(); ``` ## 实际项目中的选择 ```javascript // 1. 搜索功能 - 使用 switchMap searchInput.pipe( debounceTime(300), distinctUntilChanged(), switchMap(query => searchAPI(query)) ).subscribe(results => displayResults(results)); // 2. 批量加载 - 使用 mergeMap productIds.pipe( mergeMap(id => getProductDetails(id)) ).subscribe(products => renderProducts(products)); // 3. 顺序操作 - 使用 concatMap commands.pipe( concatMap(command => executeCommand(command)) ).subscribe(result => logResult(result)); ``` ## 注意事项 1. **内存泄漏**: mergeMap 可能创建大量并发请求,需要注意内存使用 2. **取消逻辑**: switchMap 会自动取消,但 concatMap 和 mergeMap 不会 3. **错误处理**: 任何一个内部 Observable 出错都会导致整个流失败 4. **取消订阅**: 始终记得取消订阅以避免内存泄漏 ## 最佳实践 ```javascript // 1. 结合错误处理 searchInput.pipe( switchMap(query => searchAPI(query).pipe( catchError(error => of([])) ) ) ).subscribe(results => displayResults(results)); // 2. 限制并发数(使用 mergeMap) import { mergeMap } from 'rxjs/operators'; source.pipe( mergeMap(value => process(value), 3) // 限制并发数为3 ).subscribe(); // 3. 添加重试机制 source.pipe( switchMap(value => apiCall(value).pipe( retry(3), catchError(error => of(defaultValue)) ) ) ).subscribe(); ```
前端 · 2026年2月21日 16:55
RxJS 中 Subject、BehaviorSubject、ReplaySubject 和 AsyncSubject 有什么区别?## Subject 的核心概念 Subject 是 RxJS 中一种特殊的 Observable,它既是 Observable 又是 Observer。这意味着它可以: - 被多个观察者订阅 - 主动推送新值给所有订阅者 - 实现多播(multicast)功能 ## Subject 类型 ### 1. Subject 基础 Subject,每次有新订阅者时,不会回放之前的值 ```javascript const subject = new Subject(); subject.next(1); subject.next(2); const subscription1 = subject.subscribe(value => console.log('订阅者1:', value)); // 订阅者1: 3 // 订阅者1: 4 subject.next(3); subject.next(4); const subscription2 = subject.subscribe(value => console.log('订阅者2:', value)); // 订阅者2: 5 // 订阅者2: 6 subject.next(5); subject.next(6); ``` ### 2. BehaviorSubject BehaviorSubject 会记住最新的值,新订阅者会立即接收到当前值 ```javascript const behaviorSubject = new BehaviorSubject('初始值'); behaviorSubject.subscribe(value => console.log('订阅者1:', value)); // 订阅者1: 初始值 behaviorSubject.next('值1'); // 订阅者1: 值1 behaviorSubject.subscribe(value => console.log('订阅者2:', value)); // 订阅者2: 值1 - 立即收到最新值 behaviorSubject.next('值2'); // 订阅者1: 值2 // 订阅者2: 值2 ``` ### 3. ReplaySubject ReplaySubject 可以回放指定数量的历史值 ```javascript const replaySubject = new ReplaySubject(2); // 回放最后2个值 replaySubject.next('值1'); replaySubject.next('值2'); replaySubject.next('值3'); replaySubject.subscribe(value => console.log('订阅者1:', value)); // 订阅者1: 值2 // 订阅者1: 值3 replaySubject.next('值4'); // 订阅者1: 值4 replaySubject.subscribe(value => console.log('订阅者2:', value)); // 订阅者2: 值3 // 订阅者2: 值4 ``` ### 4. AsyncSubject AsyncSubject 只在完成时发出最后一个值 ```javascript const asyncSubject = new AsyncSubject(); asyncSubject.subscribe(value => console.log('订阅者1:', value)); asyncSubject.next('值1'); asyncSubject.next('值2'); asyncSubject.next('值3'); // 此时还没有输出 asyncSubject.complete(); // 订阅者1: 值3 - 只发出最后一个值 asyncSubject.subscribe(value => console.log('订阅者2:', value)); // 订阅者2: 值3 - 新订阅者也能收到最后一个值 ``` ## 实际应用场景 ### Subject 使用场景 - 事件总线 - 状态管理 - 多个组件共享数据流 ### BehaviorSubject 使用场景 - 存储当前状态 - 表单状态管理 - 用户信息管理 ### ReplaySubject 使用场景 - 缓存历史数据 - 重播操作日志 - 错误重试机制 ### AsyncSubject 使用场景 - HTTP 请求缓存 - 只需要最终结果的计算 - 异步操作的最后值 ## 与普通 Observable 的区别 ```javascript // 普通 Observable - 单播 const observable = new Observable(subscriber => { subscriber.next(1); subscriber.next(2); }); observable.subscribe(v => console.log('观察者1:', v)); observable.subscribe(v => console.log('观察者2:', v)); // 每个订阅者独立执行 // Subject - 多播 const subject = new Subject(); const source = new Observable(subscriber => { subscriber.next(1); subscriber.next(2); }); source.subscribe(subject); subject.subscribe(v => console.log('订阅者1:', v)); subject.subscribe(v => console.log('订阅者2:', v)); // 所有订阅者共享同一个数据流 ``` ## 性能考虑 Subject 在多播场景下可以提高性能,避免重复执行相同的异步操作。但需要注意内存泄漏问题,及时取消订阅。
前端 · 2026年2月21日 16:55
RxJS 中如何进行性能优化?## RxJS 性能优化的核心原则 RxJS 性能优化主要关注以下几个方面: - 减少不必要的订阅和取消订阅 - 优化操作符链的执行效率 - 合理使用多播和缓存 - 避免内存泄漏 - 减少计算开销 ## 优化策略 ### 1. 使用 share 和 shareReplay 避免重复执行相同的 Observable。 ```javascript import { of } from 'rxjs'; import { share, shareReplay } from 'rxjs/operators'; // ❌ 不好的做法:每次订阅都重新执行 const data$ = http.get('/api/data'); data$.subscribe(data => console.log('Subscriber 1:', data)); data$.subscribe(data => console.log('Subscriber 2:', data)); // 发起两次 HTTP 请求 // ✅ 好的做法:共享 Observable const shared$ = http.get('/api/data').pipe( share() // 或 shareReplay(1) ); shared$.subscribe(data => console.log('Subscriber 1:', data)); shared$.subscribe(data => console.log('Subscriber 2:', data)); // 只发起一次 HTTP 请求 ``` ### 2. 使用 debounceTime 和 throttleTime 减少高频事件的处理频率。 ```javascript import { fromEvent } from 'rxjs'; import { debounceTime, throttleTime } from 'rxjs/operators'; // ❌ 不好的做法:处理每个滚动事件 fromEvent(window, 'scroll').subscribe(event => { handleScroll(event); // 可能每秒触发数百次 }); // ✅ 好的做法:节流处理 fromEvent(window, 'scroll').pipe( throttleTime(200) // 每 200ms 最多处理一次 ).subscribe(event => { handleScroll(event); }); // ✅ 好的做法:防抖处理 fromEvent(searchInput, 'input').pipe( debounceTime(300) // 停止输入 300ms 后才处理 ).subscribe(event => { search(event.target.value); }); ``` ### 3. 使用 distinctUntilChanged 避免处理重复的值。 ```javascript import { fromEvent } from 'rxjs'; import { debounceTime, distinctUntilChanged } from 'rxjs/operators'; // ❌ 不好的做法:可能处理相同的搜索词 fromEvent(searchInput, 'input').pipe( debounceTime(300) ).subscribe(event => { search(event.target.value); }); // ✅ 好的做法:只在值变化时处理 fromEvent(searchInput, 'input').pipe( debounceTime(300), distinctUntilChanged() // 避免重复搜索 ).subscribe(event => { search(event.target.value); }); ``` ### 4. 使用 take 和 takeWhile 及时取消不再需要的订阅。 ```javascript import { interval } from 'rxjs'; import { take, takeWhile } from 'rxjs/operators'; // ❌ 不好的做法:无限订阅 interval(1000).subscribe(value => { console.log(value); // 需要手动取消订阅 }); // ✅ 好的做法:自动取消订阅 interval(1000).pipe( take(10) // 只取 10 个值 ).subscribe(value => { console.log(value); }); // ✅ 好的做法:条件取消 interval(1000).pipe( takeWhile(value => value < 10) ).subscribe(value => { console.log(value); }); ``` ### 5. 使用 switchMap 而不是 mergeMap 在需要取消旧操作的场景中。 ```javascript import { fromEvent } from 'rxjs'; import { switchMap, mergeMap } from 'rxjs/operators'; // ❌ 不好的做法:所有请求都会完成 fromEvent(searchInput, 'input').pipe( debounceTime(300), mergeMap(query => searchAPI(query)) // 所有请求都会完成 ).subscribe(results => { displayResults(results); }); // ✅ 好的做法:取消旧请求 fromEvent(searchInput, 'input').pipe( debounceTime(300), switchMap(query => searchAPI(query)) // 取消旧请求 ).subscribe(results => { displayResults(results); }); ``` ### 6. 使用 buffer 和 bufferTime 批量处理数据,减少处理次数。 ```javascript import { interval } from 'rxjs'; import { bufferTime } from 'rxjs/operators'; // ❌ 不好的做法:逐个处理 interval(100).pipe( take(100) ).subscribe(value => { processItem(value); // 处理 100 次 }); // ✅ 好的做法:批量处理 interval(100).pipe( take(100), bufferTime(1000) // 每秒批量处理 ).subscribe(buffer => { processBatch(buffer); // 只处理 10 次 }); ``` ### 7. 使用 filter 提前过滤 尽早过滤掉不需要的数据。 ```javascript import { of } from 'rxjs'; import { map, filter } from 'rxjs/operators'; // ❌ 不好的做法:先处理再过滤 of(1, 2, 3, 4, 5).pipe( map(x => { console.log('Processing:', x); // 处理所有值 return x * 2; }), filter(x => x > 5) ).subscribe(value => { console.log('Result:', value); }); // ✅ 好的做法:先过滤再处理 of(1, 2, 3, 4, 5).pipe( filter(x => x > 2), // 先过滤 map(x => { console.log('Processing:', x); // 只处理需要的值 return x * 2; }) ).subscribe(value => { console.log('Result:', value); }); ``` ### 8. 使用 finalize 清理资源 确保资源被正确释放。 ```javascript import { interval } from 'rxjs'; import { take, finalize } from 'rxjs/operators'; // ❌ 不好的做法:可能忘记清理 const subscription = interval(1000).pipe( take(10) ).subscribe(value => { console.log(value); }); // 可能忘记取消订阅 // ✅ 好的做法:自动清理 interval(1000).pipe( take(10), finalize(() => { console.log('Cleaning up...'); cleanupResources(); }) ).subscribe(value => { console.log(value); }); ``` ## 高级优化技巧 ### 1. 使用 combineLatest 而不是嵌套订阅 ```javascript // ❌ 不好的做法:嵌套订阅 this.userService.getUser(userId).subscribe(user => { this.postsService.getPosts(user.id).subscribe(posts => { this.commentsService.getComments(posts[0].id).subscribe(comments => { // 处理数据 }); }); }); // ✅ 好的做法:使用 combineLatest combineLatest([ this.userService.getUser(userId), this.postsService.getPosts(userId), this.commentsService.getComments(postId) ]).pipe( map(([user, posts, comments]) => ({ user, posts, comments })) ).subscribe(data => { // 处理数据 }); ``` ### 2. 使用 mergeMap 限制并发 ```javascript import { of } from 'rxjs'; import { mergeMap } from 'rxjs/operators'; // ❌ 不好的做法:可能同时发起大量请求 const ids = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; from(ids).pipe( mergeMap(id => fetchData(id)) // 可能同时发起 10 个请求 ).subscribe(result => { console.log(result); }); // ✅ 好的做法:限制并发数 from(ids).pipe( mergeMap(id => fetchData(id), 3) // 最多同时 3 个请求 ).subscribe(result => { console.log(result); }); ``` ### 3. 使用 ReplaySubject 缓存数据 ```javascript import { ReplaySubject } from 'rxjs'; // ❌ 不好的做法:每次都重新获取 class DataService { getData() { return http.get('/api/data'); } } // ✅ 好的做法:缓存数据 class DataService { private cache$ = new ReplaySubject(1); getData() { if (!this.hasFetched) { http.get('/api/data').subscribe(data => { this.cache$.next(data); this.hasFetched = true; }); } return this.cache$.asObservable(); } } ``` ### 4. 使用 scan 而不是 reduce scan 可以持续发出中间结果,而 reduce 只在完成时发出结果。 ```javascript import { of } from 'rxjs'; import { scan, reduce } from 'rxjs/operators'; // ❌ 不好的做法:只在完成时得到结果 of(1, 2, 3, 4, 5).pipe( reduce((sum, value) => sum + value, 0) ).subscribe(sum => { console.log('Final sum:', sum); // 只输出一次 }); // ✅ 好的做法:持续输出中间结果 of(1, 2, 3, 4, 5).pipe( scan((sum, value) => sum + value, 0) ).subscribe(sum => { console.log('Current sum:', sum); // 输出 5 次 }); ``` ### 5. 使用 tap 进行调试 ```javascript import { of } from 'rxjs'; import { map, filter, tap } from 'rxjs/operators'; // ✅ 好的做法:使用 tap 调试 of(1, 2, 3, 4, 5).pipe( tap(value => console.log('Input:', value)), filter(x => x > 2), tap(value => console.log('After filter:', value)), map(x => x * 2), tap(value => console.log('After map:', value)) ).subscribe(value => { console.log('Output:', value); }); ``` ## 性能监控 ### 1. 使用 performance.now() 测量性能 ```javascript import { of } from 'rxjs'; import { map, tap } from 'rxjs/operators'; const startTime = performance.now(); of(1, 2, 3, 4, 5).pipe( map(x => x * 2), tap(() => { const elapsed = performance.now() - startTime; console.log(`Elapsed: ${elapsed.toFixed(2)}ms`); }) ).subscribe(); ``` ### 2. 使用 count 统计数据量 ```javascript import { of } from 'rxjs'; import { count, tap } from 'rxjs/operators'; of(1, 2, 3, 4, 5).pipe( tap(value => console.log('Processing:', value)), count() ).subscribe(count => { console.log('Total processed:', count); }); ``` ## 常见性能问题 ### 1. 过多的订阅 ```javascript // ❌ 不好的做法:创建多个订阅 const data$ = http.get('/api/data'); data$.subscribe(data => updateUI1(data)); data$.subscribe(data => updateUI2(data)); data$.subscribe(data => updateUI3(data)); // ✅ 好的做法:共享订阅 const data$ = http.get('/api/data').pipe( share() ); data$.subscribe(data => updateUI1(data)); data$.subscribe(data => updateUI2(data)); data$.subscribe(data => updateUI3(data)); ``` ### 2. 内存泄漏 ```javascript // ❌ 不好的做法:忘记取消订阅 class MyComponent { ngOnInit() { this.dataService.getData().subscribe(data => { this.data = data; }); } // 组件销毁时订阅仍然存在 } // ✅ 好的做法:正确取消订阅 class MyComponent implements OnDestroy { private destroy$ = new Subject<void>(); ngOnInit() { this.dataService.getData().pipe( takeUntil(this.destroy$) ).subscribe(data => { this.data = data; }); } ngOnDestroy() { this.destroy$.next(); this.destroy$.complete(); } } ``` ### 3. 不必要的计算 ```javascript // ❌ 不好的做法:重复计算 of(1, 2, 3).pipe( map(x => { const result = expensiveCalculation(x); return result; }), map(x => { const result = expensiveCalculation(x); // 重复计算 return result; }) ).subscribe(); // ✅ 好的做法:避免重复计算 of(1, 2, 3).pipe( map(x => { const result = expensiveCalculation(x); return { value: x, result }; }) ).subscribe(data => { console.log(data.value, data.result); }); ``` ## 最佳实践 ### 1. 使用 AsyncPipe ```typescript @Component({ template: ` <div *ngIf="data$ | async as data"> {{ data }} </div> ` }) export class MyComponent { data$ = this.service.getData(); // AsyncPipe 自动管理订阅 } ``` ### 2. 使用 takeUntil ```typescript export class MyComponent implements OnDestroy { private destroy$ = new Subject<void>(); ngOnInit() { this.service.getData().pipe( takeUntil(this.destroy$) ).subscribe(data => { this.data = data; }); } ngOnDestroy() { this.destroy$.next(); this.destroy$.complete(); } } ``` ### 3. 使用 shareReplay ```typescript @Injectable() export class DataService { private cache = new Map<string, Observable<any>>(); getData(key: string) { if (!this.cache.has(key)) { this.cache.set(key, http.get(`/api/data/${key}`).pipe( shareReplay(1) )); } return this.cache.get(key)!; } } ``` ## 总结 RxJS 性能优化的关键点: 1. **共享订阅**: 使用 share 和 shareReplay 避免重复执行 2. **减少处理频率**: 使用 debounceTime 和 throttleTime 3. **过滤重复数据**: 使用 distinctUntilChanged 4. **及时取消订阅**: 使用 take 和 takeWhile 5. **选择合适的操作符**: switchMap vs mergeMap vs concatMap 6. **批量处理**: 使用 buffer 和 bufferTime 7. **提前过滤**: 使用 filter 尽早过滤不需要的数据 8. **清理资源**: 使用 finalize 确保资源释放 9. **避免嵌套订阅**: 使用 combineLatest 和 forkJoin 10. **限制并发**: 使用 mergeMap 的并发参数 掌握这些优化技巧可以显著提升 RxJS 应用的性能。
前端 · 2026年2月21日 16:54
RxJS 在 Angular 中如何应用?## RxJS 在 Angular 中的应用 RxJS 是 Angular 框架的核心依赖,广泛应用于异步操作、事件处理和数据流管理。 ## 核心应用场景 ### 1. HTTP 请求 Angular 的 HttpClient 返回 Observable,便于处理异步请求。 ```typescript import { HttpClient } from '@angular/common/http'; import { Observable } from 'rxjs'; @Injectable({ providedIn: 'root' }) export class DataService { constructor(private http: HttpClient) {} getUsers(): Observable<User[]> { return this.http.get<User[]>('/api/users'); } getUserById(id: number): Observable<User> { return this.http.get<User>(`/api/users/${id}`); } createUser(user: User): Observable<User> { return this.http.post<User>('/api/users', user); } updateUser(id: number, user: User): Observable<User> { return this.http.put<User>(`/api/users/${id}`, user); } deleteUser(id: number): Observable<void> { return this.http.delete<void>(`/api/users/${id}`); } } ``` **在组件中使用**: ```typescript import { Component, OnInit } from '@angular/core'; import { DataService } from './data.service'; @Component({ selector: 'app-user-list', template: ` <div *ngIf="users$ | async as users"> <div *ngFor="let user of users"> {{ user.name }} </div> </div> ` }) export class UserListComponent implements OnInit { users$: Observable<User[]>; constructor(private dataService: DataService) {} ngOnInit() { this.users$ = this.dataService.getUsers(); } } ``` ### 2. 表单处理 Angular 的响应式表单与 RxJS 完美集成。 ```typescript import { Component, OnInit } from '@angular/core'; import { FormBuilder, FormGroup, Validators } from '@angular/forms'; @Component({ selector: 'app-search-form', template: ` <form [formGroup]="searchForm"> <input formControlName="search" placeholder="Search..."> </form> ` }) export class SearchFormComponent implements OnInit { searchForm: FormGroup; constructor(private fb: FormBuilder) { this.searchForm = this.fb.group({ search: ['', Validators.minLength(3)] }); } ngOnInit() { // 监听搜索输入 this.searchForm.get('search')?.valueChanges.pipe( debounceTime(300), distinctUntilChanged(), filter(query => query.length >= 3), switchMap(query => this.search(query)) ).subscribe(results => { this.displayResults(results); }); } search(query: string): Observable<SearchResult[]> { return this.http.get<SearchResult[]>(`/api/search?q=${query}`); } displayResults(results: SearchResult[]) { // 显示搜索结果 } } ``` ### 3. 路由处理 使用 RxJS 处理路由参数和查询参数。 ```typescript import { Component, OnInit } from '@angular/core'; import { ActivatedRoute, Router } from '@angular/router'; @Component({ selector: 'app-user-detail', template: ` <div *ngIf="user$ | async as user"> <h1>{{ user.name }}</h1> <p>{{ user.email }}</p> </div> ` }) export class UserDetailComponent implements OnInit { user$: Observable<User>; constructor( private route: ActivatedRoute, private router: Router, private dataService: DataService ) {} ngOnInit() { // 监听路由参数变化 this.user$ = this.route.paramMap.pipe( switchMap(params => { const id = Number(params.get('id')); return this.dataService.getUserById(id); }) ); } navigateToUser(id: number) { this.router.navigate(['/users', id]); } } ``` ### 4. 状态管理 使用 BehaviorSubject 或 NgRx 进行状态管理。 **简单状态管理**: ```typescript import { Injectable } from '@angular/core'; import { BehaviorSubject, Observable } from 'rxjs'; @Injectable({ providedIn: 'root' }) export class StateService { private state$ = new BehaviorSubject<AppState>({ user: null, isLoading: false, error: null }); getState(): Observable<AppState> { return this.state$.asObservable(); } updateUser(user: User) { const currentState = this.state$.value; this.state$.next({ ...currentState, user }); } setLoading(loading: boolean) { const currentState = this.state$.value; this.state$.next({ ...currentState, isLoading: loading }); } } ``` **在组件中使用**: ```typescript @Component({ selector: 'app-app', template: ` <div *ngIf="state$ | async as state"> <div *ngIf="state.isLoading">Loading...</div> <div *ngIf="state.user">Welcome, {{ state.user.name }}</div> </div> ` }) export class AppComponent { state$: Observable<AppState>; constructor(private stateService: StateService) { this.state$ = this.stateService.getState(); } } ``` ## 高级应用模式 ### 1. 使用 AsyncPipe AsyncPipe 自动管理订阅和取消订阅。 ```typescript @Component({ selector: 'app-user-list', template: ` <div *ngIf="users$ | async as users"> <div *ngFor="let user of users"> {{ user.name }} </div> </div> ` }) export class UserListComponent { users$: Observable<User[]>; constructor(private dataService: DataService) { this.users$ = this.dataService.getUsers(); } } ``` ### 2. 使用 takeUntil 防止内存泄漏 ```typescript import { Component, OnInit, OnDestroy } from '@angular/core'; import { Subject } from 'rxjs'; import { takeUntil } from 'rxjs/operators'; @Component({ selector: 'app-component', template: `...` }) export class MyComponent implements OnInit, OnDestroy { private destroy$ = new Subject<void>(); ngOnInit() { this.dataService.getUsers().pipe( takeUntil(this.destroy$) ).subscribe(users => { this.users = users; }); } ngOnDestroy() { this.destroy$.next(); this.destroy$.complete(); } } ``` ### 3. 使用 shareReplay 缓存数据 ```typescript import { Injectable } from '@angular/core'; import { Observable, shareReplay } from 'rxjs'; @Injectable({ providedIn: 'root' }) export class CacheService { private cache = new Map<string, Observable<any>>(); get<T>(key: string, fetchFn: () => Observable<T>): Observable<T> { if (!this.cache.has(key)) { this.cache.set(key, fetchFn().pipe( shareReplay(1) )); } return this.cache.get(key) as Observable<T>; } clear() { this.cache.clear(); } } ``` ### 4. 使用 combineLatest 组合多个数据源 ```typescript @Component({ selector: 'app-dashboard', template: ` <div *ngIf="dashboardData$ | async as data"> <h2>Users: {{ data.users.length }}</h2> <h2>Posts: {{ data.posts.length }}</h2> <h2>Comments: {{ data.comments.length }}</h2> </div> ` }) export class DashboardComponent { dashboardData$: Observable<DashboardData>; constructor(private dataService: DataService) { this.dashboardData$ = combineLatest([ this.dataService.getUsers(), this.dataService.getPosts(), this.dataService.getComments() ]).pipe( map(([users, posts, comments]) => ({ users, posts, comments })) ); } } ``` ## 常见问题和解决方案 ### 1. 处理错误 ```typescript this.dataService.getUsers().pipe( catchError(error => { console.error('Failed to load users:', error); return of([]); // 返回空数组作为降级 }) ).subscribe(users => { this.users = users; }); ``` ### 2. 重试失败的请求 ```typescript this.dataService.getUsers().pipe( retry(3), // 重试 3 次 catchError(error => { console.error('Failed after retries:', error); return of([]); }) ).subscribe(users => { this.users = users; }); ``` ### 3. 加载状态管理 ```typescript @Component({ selector: 'app-user-list', template: ` <div *ngIf="isLoading">Loading...</div> <div *ngIf="users$ | async as users"> <div *ngFor="let user of users"> {{ user.name }} </div> </div> ` }) export class UserListComponent { isLoading = false; users$: Observable<User[]>; constructor(private dataService: DataService) {} loadUsers() { this.isLoading = true; this.dataService.getUsers().pipe( finalize(() => { this.isLoading = false; }) ).subscribe(users => { this.users = users; }); } } ``` ### 4. 搜索防抖 ```typescript @Component({ selector: 'app-search', template: ` <input #searchInput (input)="onSearch($event)" placeholder="Search..."> <div *ngIf="results$ | async as results"> <div *ngFor="let result of results"> {{ result.name }} </div> </div> ` }) export class SearchComponent { results$: Observable<SearchResult[]>; constructor(private dataService: DataService) {} onSearch(event: Event) { const query = (event.target as HTMLInputElement).value; this.results$ = of(query).pipe( debounceTime(300), distinctUntilChanged(), switchMap(q => this.dataService.search(q)) ); } } ``` ## 最佳实践 ### 1. 使用 AsyncPipe ```typescript // ✅ 推荐 @Component({ template: `<div *ngIf="data$ | async as data">{{ data }}</div>` }) export class MyComponent { data$ = this.service.getData(); } // ❌ 不推荐 @Component({ template: `<div>{{ data }}</div>` }) export class MyComponent implements OnInit, OnDestroy { data: any; private subscription: Subscription; ngOnInit() { this.subscription = this.service.getData().subscribe(data => { this.data = data; }); } ngOnDestroy() { this.subscription.unsubscribe(); } } ``` ### 2. 防止内存泄漏 ```typescript // ✅ 推荐 export class MyComponent implements OnDestroy { private destroy$ = new Subject<void>(); ngOnInit() { this.service.getData().pipe( takeUntil(this.destroy$) ).subscribe(data => { this.data = data; }); } ngOnDestroy() { this.destroy$.next(); this.destroy$.complete(); } } // ❌ 不推荐 export class MyComponent { ngOnInit() { this.service.getData().subscribe(data => { this.data = data; }); // 忘记取消订阅 } } ``` ### 3. 错误处理 ```typescript // ✅ 推荐 this.service.getData().pipe( catchError(error => { console.error('Error:', error); return of(defaultData); }) ).subscribe(data => { this.data = data; }); // ❌ 不推荐 this.service.getData().subscribe({ next: data => { this.data = data; }, error: error => { console.error('Error:', error); // 没有降级处理 } }); ``` ### 4. 类型安全 ```typescript // ✅ 推荐 interface User { id: number; name: string; email: string; } this.http.get<User[]>('/api/users').subscribe(users => { users.forEach(user => { console.log(user.name); // 类型安全 }); }); // ❌ 不推荐 this.http.get('/api/users').subscribe((users: any) => { users.forEach((user: any) => { console.log(user.name); // 没有类型检查 }); }); ``` ## 总结 RxJS 在 Angular 中的关键应用: 1. **HTTP 请求**: 使用 HttpClient 处理异步请求 2. **表单处理**: 监听表单值变化,实现防抖和验证 3. **路由处理**: 监听路由参数和查询参数变化 4. **状态管理**: 使用 BehaviorSubject 或 NgRx 管理应用状态 5. **AsyncPipe**: 自动管理订阅,防止内存泄漏 6. **错误处理**: 使用 catchError 和 retry 处理错误 7. **性能优化**: 使用 debounceTime、shareReplay 等优化性能 掌握 RxJS 在 Angular 中的应用是成为 Angular 高级开发者的关键。
服务端 · 2026年2月21日 16:54
RxJS 6 和 RxJS 7 有什么区别?## RxJS 6 与 RxJS 7 的主要变化 ### 1. 导入方式的变化 **RxJS 6**: ```javascript // 创建操作符 import { of, from, interval } from 'rxjs'; // 操作符(使用 pipe) import { map, filter, switchMap } from 'rxjs/operators'; // 工具函数 import { Subscription } from 'rxjs'; ``` **RxJS 7**: ```javascript // 导入方式基本相同,但更加模块化 import { of, from, interval } from 'rxjs'; import { map, filter, switchMap } from 'rxjs/operators'; // 新增了一些操作符 import { debounceTime, distinctUntilChanged } from 'rxjs/operators'; ``` ### 2. 新增的操作符 **RxJS 7 新增了多个有用的操作符**: #### 1. `partition` 根据谓词函数将 Observable 分成两个。 ```javascript import { of, partition } from 'rxjs'; const source$ = of(1, 2, 3, 4, 5, 6); const [evens$, odds$] = partition(source$, x => x % 2 === 0); evens$.subscribe(x => console.log('Even:', x)); // 2, 4, 6 odds$.subscribe(x => console.log('Odd:', x)); // 1, 3, 5 ``` #### 2. `tap` 的改进 `tap` 现在接受一个对象,可以分别处理不同的通知类型。 ```javascript import { of } from 'rxjs'; import { tap } from 'rxjs/operators'; of(1, 2, 3).pipe( tap({ subscribe: () => console.log('Subscribed'), next: value => console.log('Next:', value), error: error => console.log('Error:', error), complete: () => console.log('Completed'), unsubscribe: () => console.log('Unsubscribed') }) ).subscribe(); ``` #### 3. `connectable` 操作符 简化了多播 Observable 的创建。 ```javascript import { interval, connectable } from 'rxjs'; import { take } from 'rxjs/operators'; const source$ = interval(1000).pipe(take(5)); const connectable$ = connectable(source$); connectable$.subscribe(value => console.log('Subscriber 1:', value)); connectable$.subscribe(value => console.log('Subscriber 2:', value)); connectable$.connect(); ``` #### 4. `shareReplay` 的改进 `shareReplay` 现在支持配置对象。 ```javascript import { interval } from 'rxjs'; import { shareReplay, take } from 'rxjs/operators'; const shared$ = interval(1000).pipe( take(5), shareReplay({ bufferSize: 2, refCount: true }) ); ``` #### 5. `filter` 的类型推断改进 更好的 TypeScript 类型推断。 ```javascript import { of } from 'rxjs'; import { filter } from 'rxjs/operators'; const source$ = of(1, 2, 3, 4, 5); // RxJS 7 中类型推断更准确 const even$ = source$.pipe( filter(x => x % 2 === 0) // x 被推断为 number ); even$.subscribe(x => console.log(x)); // x 是 number 类型 ``` ### 3. 废弃的操作符 以下操作符在 RxJS 7 中被废弃: #### 1. `throwError` **RxJS 6**: ```javascript import { throwError } from 'rxjs'; throwError('Error message'); ``` **RxJS 7**: ```javascript import { throwError } from 'rxjs'; // 推荐使用工厂函数 throwError(() => new Error('Error message')); ``` #### 2. `concat` 和 `merge` 的静态方法 虽然仍然可用,但推荐使用 `concatWith` 和 `mergeWith`。 ```javascript import { of, concat, merge } from 'rxjs'; // 仍然可用 concat(of(1), of(2)).subscribe(); merge(of(1), of(2)).subscribe(); // 推荐使用 of(1).pipe(concatWith(of(2))).subscribe(); of(1).pipe(mergeWith(of(2))).subscribe(); ``` ### 4. 性能优化 #### 1. 更小的包体积 RxJS 7 通过树摇优化减少了包体积。 ```javascript // RxJS 7 只导入需要的操作符 import { of } from 'rxjs'; import { map } from 'rxjs/operators'; // 打包时只会包含 of 和 map ``` #### 2. 更快的执行速度 某些操作符的执行速度得到了优化。 ```javascript import { of } from 'rxjs'; import { map, filter } from 'rxjs/operators'; // RxJS 7 中这些操作符执行更快 of(1, 2, 3, 4, 5).pipe( map(x => x * 2), filter(x => x > 5) ).subscribe(); ``` ### 5. TypeScript 改进 #### 1. 更好的类型推断 ```javascript import { of } from 'rxjs'; import { map, filter } from 'rxjs/operators'; // RxJS 7 中类型推断更准确 const result$ = of(1, 2, 3).pipe( map(x => x.toString()), // 推断为 Observable<string> filter(x => x.length > 0) ); ``` #### 2. 严格类型检查 ```javascript import { of } from 'rxjs'; import { map } from 'rxjs/operators'; // RxJS 7 中类型检查更严格 of(1, 2, 3).pipe( map(x => x.toUpperCase()) // TypeScript 错误:number 没有 toUpperCase ); ``` ### 6. 错误处理改进 #### 1. 更好的错误信息 ```javascript import { of } from 'rxjs'; import { map } from 'rxjs/operators'; of(1, 2, 3).pipe( map(x => { if (x === 2) throw new Error('Custom error'); return x; }) ).subscribe({ error: error => { console.log(error.message); // 更清晰的错误信息 } }); ``` #### 2. `onErrorResumeNext` 的改进 ```javascript import { of, onErrorResumeNext } from 'rxjs'; const source1$ = of(1, 2, 3).pipe( map(x => { if (x === 2) throw new Error('Error'); return x; }) ); const source2$ = of(4, 5, 6); onErrorResumeNext(source1$, source2$).subscribe(console.log); // 输出: 1, 4, 5, 6 ``` ### 7. 调度器改进 #### 1. `animationFrameScheduler` 的改进 ```javascript import { interval, animationFrameScheduler } from 'rxjs'; import { take } from 'rxjs/operators'; // RxJS 7 中 animationFrameScheduler 更稳定 interval(0, animationFrameScheduler).pipe( take(60) ).subscribe(frame => { console.log('Frame:', frame); }); ``` #### 2. `asapScheduler` 的改进 ```javascript import { of, asapScheduler } from 'rxjs'; // RxJS 7 中 asapScheduler 性能更好 of(1, 2, 3, asapScheduler).subscribe(value => { console.log(value); }); ``` ### 8. 测试工具改进 #### 1. `TestScheduler` 的改进 ```javascript import { TestScheduler } from 'rxjs/testing'; // RxJS 7 中 TestScheduler 更易用 const testScheduler = new TestScheduler((actual, expected) => { expect(actual).toEqual(expected); }); testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c|'); const expected = '-a-b-c|'; expectObservable(source$).toBe(expected); }); ``` #### 2. `marble testing` 的改进 ```javascript import { TestScheduler } from 'rxjs/testing'; // RxJS 7 中 marble testing 更直观 testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => { const source$ = cold('-a-b-c|'); const expected = '-a-b-c|'; expectObservable(source$).toBe(expected); }); ``` ### 9. 文档和示例改进 #### 1. 更好的文档 RxJS 7 提供了更详细的文档和示例。 ```javascript // 官方文档提供了更多实用的示例 import { of } from 'rxjs'; import { map, filter } from 'rxjs/operators'; of(1, 2, 3, 4, 5).pipe( map(x => x * 2), filter(x => x > 5) ).subscribe(console.log); ``` #### 2. 更多的最佳实践 官方文档中包含了更多的最佳实践建议。 ```javascript // 推荐使用 pipe 而不是链式调用 import { of } from 'rxjs'; import { map, filter } from 'rxjs/operators'; // ✅ 推荐 of(1, 2, 3).pipe( map(x => x * 2), filter(x => x > 5) ).subscribe(); // ❌ 不推荐 of(1, 2, 3).pipe( map(x => x * 2) ).pipe( filter(x => x > 5) ).subscribe(); ``` ### 10. 迁移指南 #### 1. 从 RxJS 6 迁移到 RxJS 7 ```bash # 升级 RxJS npm install rxjs@7 # 检查废弃的 API npm run lint ``` #### 2. 常见迁移问题 **问题 1**: `throwError` 的使用 ```javascript // ❌ RxJS 6 throwError('Error message'); // ✅ RxJS 7 throwError(() => new Error('Error message')); ``` **问题 2**: 类型推断问题 ```javascript // 可能需要显式类型 import { of } from 'rxjs'; import { map } from 'rxjs/operators'; const result$ = of(1, 2, 3).pipe( map(x => x.toString()) ) as Observable<string>; ``` ## 总结 RxJS 7 相比 RxJS 6 的主要改进: 1. **新增操作符**: `partition`、`connectable` 等 2. **性能优化**: 更小的包体积,更快的执行速度 3. **TypeScript 改进**: 更好的类型推断和严格类型检查 4. **错误处理**: 更好的错误信息和处理机制 5. **调度器改进**: 更稳定和高效的调度器 6. **测试工具**: 更易用的 TestScheduler 7. **文档改进**: 更详细的文档和示例 迁移到 RxJS 7 可以带来更好的性能和开发体验,但需要注意一些 API 的变化。
前端 · 2026年2月21日 16:54
RxJS 中的 Observable 和 Promise 有什么区别?## 核心区别 Observable 和 Promise 都是处理异步操作的工具,但它们在设计理念和使用方式上有显著差异: ### 1. 执行时机 **Promise**: 一旦创建就会立即执行,无法取消 ```javascript const promise = new Promise((resolve) => { console.log('Promise 立即执行'); resolve('done'); }); ``` **Observable**: 只有订阅时才会执行,可以取消订阅 ```javascript const observable = new Observable((observer) => { console.log('Observable 订阅时执行'); observer.next('data'); }); observable.subscribe(); ``` ### 2. 数据流 **Promise**: 只能发出一个值,然后完成或失败 ```javascript promise.then(value => console.log(value)); // 只接收一个值 ``` **Observable**: 可以发出多个值,随时间推移持续推送数据 ```javascript observable.subscribe(value => console.log(value)); // 可以接收多个值 ``` ### 3. 可取消性 **Promise**: 无法取消,一旦创建就会执行到底 ```javascript const promise = fetch('/api/data'); // 无法中途取消这个请求 ``` **Observable**: 可以通过 unsubscribe() 取消订阅 ```javascript const subscription = observable.subscribe(); subscription.unsubscribe(); // 取消订阅 ``` ### 4. 操作符支持 **Promise**: 只有链式调用 then/catch/finally ```javascript promise .then(data => processData(data)) .then(result => console.log(result)) .catch(error => handleError(error)); ``` **Observable**: 丰富的操作符生态系统 ```javascript observable .pipe( map(data => processData(data)), filter(result => result.isValid), catchError(error => handleError(error)) ) .subscribe(); ``` ### 5. 懒加载 vs 急切加载 **Promise**: 急切执行,创建时就开始工作 **Observable**: 懒加载,直到订阅才开始执行 ### 6. 同步/异步 **Promise**: 总是异步的 **Observable**: 可以是同步或异步的 ## 实际应用场景 ### 使用 Promise 的场景 - 单次异步操作 - 不需要取消 - 只需要一个结果 - 简单的异步流程 ### 使用 Observable 的场景 - 需要处理多个值 - 需要取消操作 - 复杂的数据流处理 - 事件处理 - WebSocket 连接 - 实时数据流 ## 性能考虑 Observable 在处理复杂异步流程时提供了更好的性能和灵活性,特别是在需要处理多个异步操作或需要取消操作的场景中。Promise 更适合简单的异步操作,代码更简洁直观。
前端 · 2026年2月21日 16:28
RxJS 中常用的操作符有哪些?如何使用?## 常用操作符分类 ### 1. 创建操作符 (Creation Operators) #### of 创建一个发出指定值的 Observable ```javascript import { of } from 'rxjs'; of(1, 2, 3).subscribe(console.log); // 输出: 1, 2, 3 ``` #### from 将数组、Promise、Iterable 等转换为 Observable ```javascript import { from } from 'rxjs'; from([1, 2, 3]).subscribe(console.log); // 输出: 1, 2, 3 from(Promise.resolve('Hello')).subscribe(console.log); // 输出: Hello ``` #### interval / timer 创建定时发出的 Observable ```javascript import { interval, timer } from 'rxjs'; interval(1000).subscribe(console.log); // 每秒发出一个递增数字: 0, 1, 2, 3... timer(2000, 1000).subscribe(console.log); // 2秒后开始,每秒发出一个数字: 0, 1, 2, 3... ``` ### 2. 转换操作符 (Transformation Operators) #### map 转换每个发出的值 ```javascript import { of } from 'rxjs'; import { map } from 'rxjs/operators'; of(1, 2, 3).pipe( map(x => x * 2) ).subscribe(console.log); // 输出: 2, 4, 6 ``` #### switchMap 取消之前的内部 Observable,只处理最新的 ```javascript import { fromEvent } from 'rxjs'; import { switchMap } from 'rxjs/operators'; fromEvent(document, 'click').pipe( switchMap(() => fetch('/api/data')) ).subscribe(response => console.log(response)); ``` #### mergeMap 并行处理所有内部 Observable ```javascript import { of } from 'rxjs'; import { mergeMap } from 'rxjs/operators'; of(1, 2, 3).pipe( mergeMap(x => of(x, x * 2)) ).subscribe(console.log); // 输出: 1, 2, 2, 4, 3, 6 ``` #### concatMap 顺序处理内部 Observable,一个完成后再处理下一个 ```javascript import { of } from 'rxjs'; import { concatMap } from 'rxjs/operators'; of(1, 2, 3).pipe( concatMap(x => of(x, x * 2)) ).subscribe(console.log); // 输出: 1, 2, 2, 4, 3, 6 (顺序执行) ``` ### 3. 过滤操作符 (Filtering Operators) #### filter 过滤符合条件的值 ```javascript import { of } from 'rxjs'; import { filter } from 'rxjs/operators'; of(1, 2, 3, 4, 5).pipe( filter(x => x % 2 === 0) ).subscribe(console.log); // 输出: 2, 4 ``` #### take / takeLast / takeUntil 只取前几个、后几个或直到某个条件 ```javascript import { interval, fromEvent } from 'rxjs'; import { take, takeUntil } from 'rxjs/operators'; interval(1000).pipe( take(3) ).subscribe(console.log); // 输出: 0, 1, 2 interval(1000).pipe( takeUntil(fromEvent(document, 'click')) ).subscribe(console.log); // 点击时停止 ``` #### debounceTime 在指定时间内只发出最后一个值 ```javascript import { fromEvent } from 'rxjs'; import { debounceTime } from 'rxjs/operators'; fromEvent(inputElement, 'input').pipe( debounceTime(300) ).subscribe(event => console.log(event.target.value)); ``` #### throttleTime 在指定时间内只发出第一个值 ```javascript import { fromEvent } from 'rxjs'; import { throttleTime } from 'rxjs/operators'; fromEvent(window, 'scroll').pipe( throttleTime(200) ).subscribe(event => console.log('Scroll event')); ``` ### 4. 组合操作符 (Combination Operators) #### merge 合并多个 Observable,并行发出值 ```javascript import { merge, interval } from 'rxjs'; merge( interval(1000).pipe(map(x => `A${x}`)), interval(1500).pipe(map(x => `B${x}`)) ).subscribe(console.log); ``` #### concat 顺序连接多个 Observable ```javascript import { concat, of } from 'rxjs'; concat( of(1, 2), of(3, 4), of(5, 6) ).subscribe(console.log); // 输出: 1, 2, 3, 4, 5, 6 ``` #### combineLatest 组合多个 Observable 的最新值 ```javascript import { combineLatest, of } from 'rxjs'; combineLatest([ of(1, 2, 3), of('a', 'b', 'c') ]).subscribe(([num, char]) => console.log(num, char)); // 输出: [3, 'a'], [3, 'b'], [3, 'c'] ``` #### zip 按索引组合多个 Observable 的值 ```javascript import { zip, of } from 'rxjs'; zip( of(1, 2, 3), of('a', 'b', 'c') ).subscribe(([num, char]) => console.log(num, char)); // 输出: [1, 'a'], [2, 'b'], [3, 'c'] ``` ### 5. 错误处理操作符 (Error Handling Operators) #### catchError 捕获错误并返回新的 Observable ```javascript import { of } from 'rxjs'; import { map, catchError } from 'rxjs/operators'; of(1, 2, 3, 4).pipe( map(x => { if (x === 3) throw new Error('Error!'); return x; }), catchError(error => of('default value')) ).subscribe(console.log); // 输出: 1, 2, 'default value' ``` #### retry 重试失败的 Observable ```javascript import { of } from 'rxjs'; import { map, retry } from 'rxjs/operators'; let count = 0; of(1, 2, 3).pipe( map(x => { count++; if (count < 3) throw new Error('Error!'); return x; }), retry(2) ).subscribe(console.log); ``` ### 6. 工具操作符 (Utility Operators) #### tap 执行副作用,不修改值 ```javascript import { of } from 'rxjs'; import { tap, map } from 'rxjs/operators'; of(1, 2, 3).pipe( tap(x => console.log('Before:', x)), map(x => x * 2), tap(x => console.log('After:', x)) ).subscribe(); ``` #### delay 延迟发出值 ```javascript import { of } from 'rxjs'; import { delay } from 'rxjs/operators'; of(1, 2, 3).pipe( delay(1000) ).subscribe(console.log); // 1秒后输出: 1, 2, 3 ``` ## 实际应用示例 ### 搜索框防抖 ```javascript fromEvent(searchInput, 'input').pipe( debounceTime(300), map(event => event.target.value), filter(query => query.length > 2), switchMap(query => searchAPI(query)) ).subscribe(results => displayResults(results)); ``` ### 自动保存 ```javascript form.valueChanges.pipe( debounceTime(1000), distinctUntilChanged(), switchMap(formData => saveAPI(formData)) ).subscribe(); ``` ### 并行请求 ```javascript merge( getUserData(userId), getUserPosts(userId), getUserComments(userId) ).pipe( combineLatestAll() ).subscribe(([user, posts, comments]) => { renderUserProfile(user, posts, comments); }); ``` ## 最佳实践 1. 合理使用 pipe() 链式调用 2. 注意操作符的执行顺序 3. 及时取消订阅避免内存泄漏 4. 根据场景选择合适的操作符 5. 使用 TypeScript 获得更好的类型推断
前端 · 2026年2月21日 16:28
如何在 RxJS 中防止内存泄漏?## 内存泄漏的原因 在 RxJS 中,内存泄漏主要发生在以下几种情况: ### 1. 未取消订阅 最常见的内存泄漏原因是订阅了 Observable 但没有取消订阅。 ```javascript // ❌ 错误示例:内存泄漏 class MyComponent { constructor() { this.data$ = http.get('/api/data').subscribe(data => { console.log(data); }); } } // 组件销毁时,订阅仍然存在,导致内存泄漏 ``` ### 2. 长期运行的 Observable interval、fromEvent 等会持续发出值的 Observable,如果不取消订阅会持续占用内存。 ```javascript // ❌ 错误示例 setInterval(() => { console.log('Running...'); }, 1000); // ✅ 正确示例 const subscription = interval(1000).subscribe(); subscription.unsubscribe(); ``` ### 3. 闭包引用 订阅回调中引用了外部变量,导致这些变量无法被垃圾回收。 ```javascript // ❌ 错误示例 function createSubscription() { const largeData = new Array(1000000).fill('data'); return interval(1000).subscribe(() => { console.log(largeData.length); // largeData 被闭包引用 }); } const sub = createSubscription(); // 即使 sub 不再使用,largeData 也不会被释放 ``` ### 4. 事件监听器未移除 使用 fromEvent 创建的订阅如果不取消,事件监听器会一直存在。 ```javascript // ❌ 错误示例 fromEvent(document, 'click').subscribe(event => { console.log('Clicked'); }); // 事件监听器永远不会被移除 ``` ## 防止内存泄漏的方法 ### 1. 手动取消订阅 最直接的方法是在适当的时候调用 unsubscribe()。 ```javascript class MyComponent { private subscriptions: Subscription[] = []; ngOnInit() { const sub1 = http.get('/api/data').subscribe(data => { this.data = data; }); const sub2 = interval(1000).subscribe(() => { this.update(); }); this.subscriptions.push(sub1, sub2); } ngOnDestroy() { this.subscriptions.forEach(sub => sub.unsubscribe()); } } ``` ### 2. 使用 takeUntil takeUntil 是最常用的取消订阅方式之一。 ```javascript import { Subject, takeUntil } from 'rxjs'; class MyComponent { private destroy$ = new Subject<void>(); ngOnInit() { http.get('/api/data').pipe( takeUntil(this.destroy$) ).subscribe(data => { this.data = data; }); interval(1000).pipe( takeUntil(this.destroy$) ).subscribe(() => { this.update(); }); } ngOnDestroy() { this.destroy$.next(); this.destroy$.complete(); } } ``` ### 3. 使用 take、takeWhile、takeLast 根据条件自动取消订阅。 ```javascript // take: 只取前 N 个值 interval(1000).pipe( take(5) ).subscribe(value => console.log(value)); // 输出: 0, 1, 2, 3, 4 然后自动取消订阅 // takeWhile: 满足条件时继续订阅 interval(1000).pipe( takeWhile(value => value < 5) ).subscribe(value => console.log(value)); // 输出: 0, 1, 2, 3, 4 然后自动取消订阅 // takeLast: 只取最后 N 个值 of(1, 2, 3, 4, 5).pipe( takeLast(2) ).subscribe(value => console.log(value)); // 输出: 4, 5 ``` ### 4. 使用 first 只取第一个值,然后自动取消订阅。 ```javascript http.get('/api/data').pipe( first() ).subscribe(data => { console.log(data); }); // 只发出第一个值就完成 ``` ### 5. 使用 AsyncPipe(Angular) 在 Angular 中,AsyncPipe 会自动管理订阅。 ```typescript @Component({ template: ` <div *ngIf="data$ | async as data"> {{ data }} </div> ` }) export class MyComponent { data$ = http.get('/api/data'); // AsyncPipe 会自动取消订阅 } ``` ### 6. 使用 finalize 在取消订阅时执行清理操作。 ```javascript http.get('/api/data').pipe( finalize(() => { console.log('Cleaning up...'); // 执行清理操作 }) ).subscribe(data => { console.log(data); }); ``` ## 最佳实践 ### 1. 组件级别的订阅管理 ```typescript import { Component, OnDestroy } from '@angular/core'; import { Subject, takeUntil } from 'rxjs'; @Component({ selector: 'app-my', template: '...' }) export class MyComponent implements OnDestroy { private destroy$ = new Subject<void>(); constructor() { this.setupSubscriptions(); } private setupSubscriptions() { // 所有订阅都使用 takeUntil this.http.get('/api/user').pipe( takeUntil(this.destroy$) ).subscribe(user => { this.user = user; }); this.route.params.pipe( takeUntil(this.destroy$) ).subscribe(params => { this.loadPage(params.id); }); } ngOnDestroy() { this.destroy$.next(); this.destroy$.complete(); } } ``` ### 2. 创建可重用的取消订阅工具 ```typescript import { Subject, Observable } from 'rxjs'; import { takeUntil } from 'rxjs/operators'; export class AutoUnsubscribe { private destroy$ = new Subject<void>(); protected autoUnsubscribe<T>(observable: Observable<T>): Observable<T> { return observable.pipe(takeUntil(this.destroy$)); } ngOnDestroy() { this.destroy$.next(); this.destroy$.complete(); } } // 使用 class MyComponent extends AutoUnsubscribe { ngOnInit() { this.autoUnsubscribe(http.get('/api/data')).subscribe(data => { console.log(data); }); } } ``` ### 3. 使用 Subscription 集合 ```typescript import { Subscription } from 'rxjs'; class MyService { private subscriptions = new Subscription(); startMonitoring() { const sub1 = interval(1000).subscribe(); const sub2 = fromEvent(document, 'click').subscribe(); this.subscriptions.add(sub1); this.subscriptions.add(sub2); } stopMonitoring() { this.subscriptions.unsubscribe(); } } ``` ### 4. 避免在回调中创建订阅 ```typescript // ❌ 错误示例 interval(1000).subscribe(() => { http.get('/api/data').subscribe(data => { console.log(data); }); // 每次都创建新订阅,无法取消 }); // ✅ 正确示例 interval(1000).pipe( switchMap(() => http.get('/api/data')) ).subscribe(data => { console.log(data); }); // switchMap 会自动取消之前的订阅 ``` ## 检测内存泄漏 ### 1. 使用 Chrome DevTools ```javascript // 在组件中添加标记 class MyComponent { private id = Math.random(); ngOnDestroy() { console.log(`Component ${this.id} destroyed`); } } // 观察控制台,确认组件销毁时是否真的清理了订阅 ``` ### 2. 使用 RxJS 调试工具 ```typescript import { tap } from 'rxjs/operators'; http.get('/api/data').pipe( tap({ subscribe: () => console.log('Subscribed'), unsubscribe: () => console.log('Unsubscribed'), next: value => console.log('Next:', value), complete: () => console.log('Completed'), error: error => console.log('Error:', error) }) ).subscribe(); ``` ## 常见陷阱 ### 1. 忘记取消嵌套订阅 ```typescript // ❌ 错误示例 http.get('/api/user').subscribe(user => { http.get(`/api/posts/${user.id}`).subscribe(posts => { console.log(posts); }); // 内层订阅没有被管理 }); // ✅ 正确示例 http.get('/api/user').pipe( switchMap(user => http.get(`/api/posts/${user.id}`)) ).subscribe(posts => { console.log(posts); }); ``` ### 2. 在服务中创建订阅 ```typescript // ❌ 错误示例 @Injectable() export class DataService { constructor(private http: HttpClient) { this.http.get('/api/data').subscribe(data => { this.data = data; }); // 服务中的订阅很难取消 } } // ✅ 正确示例 @Injectable() export class DataService { private data$ = this.http.get('/api/data'); getData() { return this.data$; } } ``` ### 3. 忽略错误处理 ```typescript // ❌ 错误示例 http.get('/api/data').subscribe(data => { console.log(data); }); // 错误没有被处理,可能导致订阅无法正常完成 // ✅ 正确示例 http.get('/api/data').pipe( catchError(error => { console.error(error); return of([]); }) ).subscribe(data => { console.log(data); }); ``` ## 总结 防止 RxJS 内存泄漏的关键是: 1. **始终取消订阅**:特别是对于长期运行的 Observable 2. **使用 takeUntil**:这是最推荐的取消订阅方式 3. **避免嵌套订阅**:使用 switchMap、concatMap 等操作符 4. **使用 AsyncPipe**:在 Angular 中优先使用 AsyncPipe 5. **定期检查**:使用 DevTools 检测内存泄漏 6. **错误处理**:确保错误被正确处理,避免订阅卡住 遵循这些最佳实践,可以有效地防止 RxJS 应用中的内存泄漏问题。
前端 · 2026年2月21日 16:28