乐闻世界logo
搜索文章和话题

面试题手册

RxJS 中 Hot Observable 和 Cold Observable 有什么区别?

Hot Observable vs Cold ObservableCold Observable(冷 Observable)定义: Cold Observable 是惰性的,每个订阅者都会独立执行 Observable 的逻辑。特点:每个订阅者都会获得独立的数据流订阅时才开始执行不共享数据生产者不会主动推送数据示例:import { Observable } from 'rxjs';const cold$ = new Observable(subscriber => { console.log('Observable executed'); subscriber.next(Math.random()); subscriber.complete();});cold$.subscribe(value => console.log('Subscriber 1:', value));// Observable executed// Subscriber 1: 0.123456cold$.subscribe(value => console.log('Subscriber 2:', value));// Observable executed// Subscriber 2: 0.789012// 注意:每次订阅都重新执行,产生不同的随机数常见的 Cold Observable:of()from()interval()timer()ajax()http.get() (Angular)大多数创建操作符Hot Observable(热 Observable)定义: Hot Observable 是主动的,多个订阅者共享同一个数据流。特点:所有订阅者共享同一个数据流即使没有订阅者也会执行共享数据生产者主动推送数据示例:import { Observable, Subject } from 'rxjs';const hot$ = new Observable(subscriber => { console.log('Observable executed'); subscriber.next(Math.random()); subscriber.complete();});const subject = new Subject();hot$.subscribe(subject);subject.subscribe(value => console.log('Subscriber 1:', value));// Observable executed// Subscriber 1: 0.123456subject.subscribe(value => console.log('Subscriber 2:', value));// Subscriber 2: 0.123456// 注意:两个订阅者收到相同的值常见的 Hot Observable:Subject 及其变体BehaviorSubjectReplaySubjectAsyncSubjectDOM 事件(通过 fromEvent)WebSocket 连接share() 转换后的 Observable转换方法1. 使用 share() 将 Cold 转换为 Hotimport { interval } from 'rxjs';import { share, take } from 'rxjs/operators';const cold$ = interval(1000).pipe( take(5));const hot$ = cold$.pipe( share() // 转换为 Hot Observable);hot$.subscribe(value => console.log('Subscriber 1:', value));hot$.subscribe(value => console.log('Subscriber 2:', value));// 两个订阅者共享同一个数据流2. 使用 shareReplay() 缓存值import { interval } from 'rxjs';import { shareReplay, take } from 'rxjs/operators';const hot$ = interval(1000).pipe( take(5), shareReplay(1) // 缓存最后一个值);hot$.subscribe(value => console.log('Subscriber 1:', value));setTimeout(() => { hot$.subscribe(value => console.log('Subscriber 2:', value)); // 新订阅者会立即收到缓存的值}, 3000);3. 使用 publish() 和 connect()import { interval } from 'rxjs';import { publish, take } from 'rxjs/operators';const cold$ = interval(1000).pipe( take(5));const hot$ = cold$.pipe( publish() // 转换为 Hot Observable);hot$.subscribe(value => console.log('Subscriber 1:', value));hot$.subscribe(value => console.log('Subscriber 2:', value));hot$.connect(); // 开始执行实际应用场景Cold Observable 适用场景HTTP 请求// 每次订阅都会发起新的请求http.get('/api/data').subscribe(data => { console.log('Request 1:', data);});http.get('/api/data').subscribe(data => { console.log('Request 2:', data);});独立的数据处理// 每个订阅者需要独立的数据流of(1, 2, 3).pipe( map(x => x * 2)).subscribe(value => console.log(value));需要重新执行的场景// 每次订阅都重新计算const calculation$ = new Observable(subscriber => { const result = expensiveCalculation(); subscriber.next(result); subscriber.complete();});Hot Observable 适用场景共享数据// 多个组件共享同一个数据流const userData$ = http.get('/api/user').pipe( share());component1.userData$.subscribe(user => { console.log('Component 1:', user);});component2.userData$.subscribe(user => { console.log('Component 2:', user);});// 只发起一次请求,两个组件共享结果事件流// 多个订阅者监听同一个事件const click$ = fromEvent(document, 'click').pipe( share());click$.subscribe(event => { console.log('Handler 1:', event);});click$.subscribe(event => { console.log('Handler 2:', event);});WebSocket 连接// 多个订阅者共享同一个 WebSocket 连接const socket$ = webSocket('ws://localhost:8080').pipe( share());socket$.subscribe(message => { console.log('Handler 1:', message);});socket$.subscribe(message => { console.log('Handler 2:', message);});性能对比Cold Observable 性能特点优点:每个订阅者获得独立的数据流不会相互影响适合需要独立处理的场景缺点:可能重复执行相同的操作浪费资源(如重复的 HTTP 请求)内存占用可能更高Hot Observable 性能特点优点:共享数据流,避免重复执行节省资源(如只发起一次 HTTP 请求)内存占用更低缺点:订阅者可能错过之前的数据需要管理订阅时机可能出现竞态条件选择指南使用 Cold Observable 当:每个订阅者需要独立的数据流需要重新执行操作订阅者之间不应该相互影响数据源是按需生成的使用 Hot Observable 当:多个订阅者需要共享数据需要避免重复执行(如 HTTP 请求)数据源是主动推送的(如事件、WebSocket)需要缓存数据供后续订阅者使用最佳实践1. HTTP 请求共享// ❌ 错误:每次订阅都发起请求class UserService { getUser(id: string) { return http.get(`/api/users/${id}`); }}// ✅ 正确:共享请求结果class UserService { private cache = new Map<string, Observable<User>>(); getUser(id: string) { if (!this.cache.has(id)) { this.cache.set(id, http.get(`/api/users/${id}`).pipe( shareReplay(1) )); } return this.cache.get(id)!; }}2. 事件处理// 使用 share() 共享事件流const resize$ = fromEvent(window, 'resize').pipe( debounceTime(200), share());resize$.subscribe(event => { updateLayout1(event);});resize$.subscribe(event => { updateLayout2(event);});3. 状态管理// 使用 BehaviorSubject 管理状态const state$ = new BehaviorSubject(initialState);state$.subscribe(state => { console.log('Listener 1:', state);});state$.subscribe(state => { console.log('Listener 2:', state);});// 更新状态state$.next(newState);常见陷阱1. 忘记共享导致重复请求// ❌ 错误示例const data$ = http.get('/api/data');data$.subscribe(data => console.log('Component 1:', data));data$.subscribe(data => console.log('Component 2:', data));// 发起两次请求// ✅ 正确示例const data$ = http.get('/api/data').pipe( share());data$.subscribe(data => console.log('Component 1:', data));data$.subscribe(data => console.log('Component 2:', data));// 只发起一次请求2. 错误的共享时机// ❌ 错误示例const data$ = http.get('/api/data').pipe( share());// 立即订阅触发请求data$.subscribe();// 后续订阅者可能错过数据setTimeout(() => { data$.subscribe(data => console.log(data));}, 2000);// ✅ 正确示例const data$ = http.get('/api/data').pipe( shareReplay(1) // 缓存数据);3. 不当使用 shareReplay// ❌ 错误示例:缓存过多数据const data$ = interval(1000).pipe( shareReplay(1000) // 缓存1000个值,占用大量内存);// ✅ 正确示例:合理设置缓存大小const data$ = interval(1000).pipe( shareReplay(1) // 只缓存最后一个值);总结理解 Hot 和 Cold Observable 的区别对于编写高效的 RxJS 代码至关重要:Cold Observable: 惰性、独立执行、适合按需生成的数据Hot Observable: 主动、共享执行、适合主动推送的数据转换方法: 使用 share()、shareReplay() 等操作符进行转换性能考虑: Hot Observable 可以避免重复执行,提高性能选择原则: 根据场景选择合适的类型,避免不必要的资源浪费正确使用这两种 Observable 类型,可以显著提升应用的性能和可维护性。
阅读 0·2026年2月21日 16:59

RxJS 中的 Marble Testing 是什么?如何使用?

Marble Testing 的概念Marble Testing 是 RxJS 中一种基于字符串的可视化测试方法,它使用特殊的语法来表示 Observable 的时间流和事件。这种方法让异步测试变得直观和易于理解。Marble 语法基本符号| 符号 | 含义 ||------|------|| - | 时间流逝(1帧,约10ms)|| a, b, c | 发出的值 || | | 完成 || # | 错误 || () | 同步发出 || ^ | 订阅点(hot Observable)|| ! | 取消订阅 |示例// 基本示例const source$ = cold('-a-b-c-|');// 含义:10ms后发出a,20ms后发出b,30ms后发出c,40ms后完成// 错误示例const error$ = cold('-a-b-#');// 含义:10ms后发出a,20ms后发出b,30ms后出错// 同步示例const sync$ = cold('(abc|)');// 含义:同步发出a、b、c,然后完成// Hot Observableconst hot$ = hot('^-a-b-c-|');// 含义:从订阅点开始,10ms后发出a,20ms后发出b,30ms后发出c,40ms后完成TestScheduler 的使用基本设置import { TestScheduler } from 'rxjs/testing';describe('My Observable Tests', () => { let testScheduler: TestScheduler; beforeEach(() => { testScheduler = new TestScheduler((actual, expected) => { expect(actual).toEqual(expected); }); });});测试基本操作符import { of } from 'rxjs';import { map, filter } from 'rxjs/operators';it('should map values', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-|'); const expected = '-A-B-C-|'; const result$ = source$.pipe( map(x => x.toUpperCase()) ); expectObservable(result$).toBe(expected, { a: 'a', b: 'b', c: 'c' }); });});it('should filter values', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-d-|'); const expected = '-a-c---|'; const result$ = source$.pipe( filter(x => ['a', 'c'].includes(x)) ); expectObservable(result$).toBe(expected); });});测试时间相关操作符import { of } from 'rxjs';import { delay, debounceTime, throttleTime } from 'rxjs/operators';it('should delay emissions', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-|'); const expected = '---a-b-c-|'; // 延迟30ms const result$ = source$.pipe( delay(30, testScheduler) ); expectObservable(result$).toBe(expected); });});it('should debounce', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a--b---c-|'); const expected = '-----b---c-|'; const result$ = source$.pipe( debounceTime(20, testScheduler) ); expectObservable(result$).toBe(expected); });});it('should throttle', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-d-|'); const expected = '-a---c---|'; const result$ = source$.pipe( throttleTime(30, testScheduler) ); expectObservable(result$).toBe(expected); });});测试组合操作符import { of, merge, concat, combineLatest } from 'rxjs';it('should merge observables', () => { testScheduler.run(({ cold, expectObservable }) => { const source1$ = cold('-a---b-|'); const source2$ = cold('--c-d---|'); const expected = '-a-c-b-d-|'; const result$ = merge(source1$, source2$); expectObservable(result$).toBe(expected); });});it('should concatenate observables', () => { testScheduler.run(({ cold, expectObservable }) => { const source1$ = cold('-a-b-|'); const source2$ = cold('--c-d-|'); const expected = '-a-b--c-d-|'; const result$ = concat(source1$, source2$); expectObservable(result$).toBe(expected); });});it('should combine latest', () => { testScheduler.run(({ cold, expectObservable }) => { const source1$ = cold('-a---b-|'); const source2$ = cold('--c-d---|'); const expected = '----ab-bd-|'; const result$ = combineLatest([source1$, source2$]); expectObservable(result$).toBe(expected); });});测试错误处理import { of, throwError } from 'rxjs';import { catchError, retry } from 'rxjs/operators';it('should catch errors', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-#'); const expected = '-a-b-(d|)'; const result$ = source$.pipe( catchError(() => of('d')) ); expectObservable(result$).toBe(expected); });});it('should retry on error', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-#'); const expected = '-a-a-#'; const result$ = source$.pipe( retry(1) ); expectObservable(result$).toBe(expected); });});测试订阅和取消订阅import { interval } from 'rxjs';import { take } from 'rxjs/operators';it('should handle subscription', () => { testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => { const source$ = cold('-a-b-c-|'); const subs = '^------!'; const result$ = source$.pipe(take(2)); expectObservable(result$).toBe('-a-b-|'); expectSubscriptions(source$.subscriptions).toBe(subs); });});it('should handle unsubscription', () => { testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => { const source$ = cold('-a-b-c-d-|'); const subs = '^---!'; const result$ = source$.pipe(take(2)); expectObservable(result$).toBe('-a-b-|'); expectSubscriptions(source$.subscriptions).toBe(subs); });});实际应用示例1. 测试搜索功能import { of } from 'rxjs';import { debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';function search(query: string) { return of(`Results for ${query}`);}it('should search with debounce', () => { testScheduler.run(({ cold, expectObservable }) => { const input$ = cold('-a--b---c-|'); const expected = '-----b---c-|'; const result$ = input$.pipe( debounceTime(20, testScheduler), distinctUntilChanged(), switchMap(query => search(query)) ); expectObservable(result$).toBe(expected); });});2. 测试自动保存import { of } from 'rxjs';import { debounceTime, switchMap } from 'rxjs/operators';function save(data: any) { return of('Saved');}it('should auto-save with debounce', () => { testScheduler.run(({ cold, expectObservable }) => { const changes$ = cold('-a--b---c-|'); const expected = '-----b---c-|'; const result$ = changes$.pipe( debounceTime(20, testScheduler), switchMap(data => save(data)) ); expectObservable(result$).toBe(expected); });});3. 测试轮询功能import { interval } from 'rxjs';import { take, map } from 'rxjs/operators';it('should poll at intervals', () => { testScheduler.run(({ cold, expectObservable }) => { const expected = '-a-b-c-d-e-|'; const result$ = interval(10, testScheduler).pipe( take(5), map(x => String.fromCharCode(97 + x)) ); expectObservable(result$).toBe(expected); });});4. 测试缓存功能import { of } from 'rxjs';import { shareReplay } from 'rxjs/operators';it('should cache values', () => { testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => { const source$ = cold('-a-b-c-|'); const expected = '-a-b-c-|'; const subs = ['^------!', ' ^-!']; const cached$ = source$.pipe(shareReplay(1)); expectObservable(cached$).toBe(expected); expectObservable(cached$).toBe('--c-|'); expectSubscriptions(source$.subscriptions).toBe(subs); });});高级用法1. 测试 Hot Observableit('should handle hot observable', () => { testScheduler.run(({ hot, expectObservable }) => { const source$ = hot('--a--b--c--|'); const sub = '---^--------!'; const expected = '--b--c--|'; const result$ = source$.pipe(take(2)); expectObservable(result$, sub).toBe(expected); });});2. 测试多播import { of } from 'rxjs';import { share, multicast } from 'rxjs/operators';import { Subject } from 'rxjs';it('should multicast correctly', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-|'); const expected = '-a-b-c-|'; const shared$ = source$.pipe(share()); expectObservable(shared$).toBe(expected); expectObservable(shared$).toBe(expected); });});3. 测试自定义操作符import { Observable } from 'rxjs';import { OperatorFunction } from 'rxjs';function customMap<T, R>(project: (value: T) => R): OperatorFunction<T, R> { return (source$) => new Observable(subscriber => { return source$.subscribe({ next: value => { try { subscriber.next(project(value)); } catch (error) { subscriber.error(error); } }, error: error => subscriber.error(error), complete: () => subscriber.complete() }); });}it('should use custom operator', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-|'); const expected = '-A-B-C-|'; const result$ = source$.pipe( customMap(x => x.toUpperCase()) ); expectObservable(result$).toBe(expected); });});最佳实践1. 使用有意义的值// ✅ 好的做法it('should map values', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-|'); const expected = '-A-B-C-|'; const result$ = source$.pipe(map(x => x.toUpperCase())); expectObservable(result$).toBe(expected, { a: 'a', b: 'b', c: 'c', A: 'A', B: 'B', C: 'C' }); });});// ❌ 不好的做法it('should map values', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-|'); const expected = '-d-e-f-|'; const result$ = source$.pipe(map(x => x.toUpperCase())); expectObservable(result$).toBe(expected); });});2. 测试边界情况it('should handle empty observable', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('|'); const expected = '|'; const result$ = source$.pipe(map(x => x.toUpperCase())); expectObservable(result$).toBe(expected); });});it('should handle error observable', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-#'); const expected = '-#'; const result$ = source$.pipe(map(x => x.toUpperCase())); expectObservable(result$).toBe(expected); });});3. 使用 expectSubscriptionsit('should subscribe and unsubscribe correctly', () => { testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => { const source$ = cold('-a-b-c-|'); const subs = '^------!'; const result$ = source$.pipe(take(3)); expectObservable(result$).toBe('-a-b-c-|'); expectSubscriptions(source$.subscriptions).toBe(subs); });});总结Marble Testing 是 RxJS 中强大的测试工具,它提供了:可视化测试: 使用字符串表示时间流,直观易懂时间控制: 精确控制异步操作的时序易于维护: 清晰的语法和结构全面覆盖: 可以测试各种操作符和场景掌握 Marble Testing 可以显著提升 RxJS 代码的测试质量和开发效率。
阅读 0·2026年2月21日 16:58

RxJS 中 switchMap、mergeMap、concatMap 有什么区别?

switchMap、mergeMap、concatMap 的核心区别这三个操作符都是用于处理高阶 Observable(Observable of Observable)的,但它们的处理策略完全不同:1. switchMap特点: 取消之前的内部 Observable,只处理最新的工作原理:当新的值到达时,取消之前未完成的 Observable只保留最新的 Observable 的结果适合需要取消旧请求的场景示例代码:import { of, interval } from 'rxjs';import { switchMap, take, delay } from 'rxjs/operators';// 模拟异步请求function makeRequest(id) { return of(`Result ${id}`).pipe(delay(1000));}interval(500).pipe( switchMap(id => makeRequest(id)), take(5)).subscribe(console.log);// 输出: Result 4// 解释: 因为每500ms触发一次,但请求需要1000ms// 所以每次新请求来时,之前的请求都被取消了// 最终只有最后一个请求完成了实际应用场景:搜索框输入:每次输入都取消上一次的搜索请求自动完成:只显示最新输入的结果导航切换:取消未完成的页面加载// 搜索框示例searchInput.pipe( switchMap(query => searchAPI(query))).subscribe(results => { // 只显示最新搜索的结果 displayResults(results);});2. mergeMap特点: 并行处理所有内部 Observable工作原理:同时订阅所有内部 Observable所有 Observable 的结果都会被发出不保证顺序,结果可能交错示例代码:import { of } from 'rxjs';import { mergeMap, delay } from 'rxjs/operators';function makeRequest(id) { return of(`Result ${id}`).pipe(delay(id * 200));}of(1, 2, 3).pipe( mergeMap(id => makeRequest(id))).subscribe(console.log);// 输出: Result 1, Result 2, Result 3// 解释: 所有请求并行执行,按完成顺序输出实际应用场景:并行加载多个资源批量处理独立任务不需要顺序的并发请求// 并行加载用户数据示例merge( getUserProfile(userId), getUserPosts(userId), getUserComments(userId)).pipe( mergeMap(response => response.json())).subscribe(data => { // 所有数据并行加载完成 updateUI(data);});3. concatMap特点: 顺序处理内部 Observable,一个完成后再处理下一个工作原理:按顺序订阅内部 Observable当前 Observable 完成后才订阅下一个保证结果的顺序示例代码:import { of } from 'rxjs';import { concatMap, delay } from 'rxjs/operators';function makeRequest(id) { return of(`Result ${id}`).pipe(delay(500));}of(1, 2, 3).pipe( concatMap(id => makeRequest(id))).subscribe(console.log);// 输出: Result 1, Result 2, Result 3// 解释: 每个请求按顺序执行,前一个完成后才执行下一个实际应用场景:需要保证顺序的请求依赖前一个结果的后续请求防止服务器过载// 顺序上传文件示例files.pipe( concatMap(file => uploadFile(file))).subscribe(result => { // 文件按顺序上传 console.log('Uploaded:', result);});对比总结| 特性 | switchMap | mergeMap | concatMap ||------|-----------|----------|-----------|| 执行方式 | 取消旧的,只保留最新的 | 并行执行所有 | 顺序执行 || 结果顺序 | 只保留最新结果 | 不保证顺序 | 保证顺序 || 并发数 | 1(同时只有1个) | 无限制 | 1(同时只有1个) || 适用场景 | 搜索、自动完成 | 并行加载 | 顺序处理 || 性能 | 最快(取消旧请求) | 最快(并行) | 较慢(顺序) || 内存占用 | 低 | 高 | 低 |选择指南使用 switchMap 当:需要取消旧请求只关心最新结果搜索、自动完成等场景避免不必要的网络请求使用 mergeMap 当:需要并行处理请求之间没有依赖需要最大化性能不关心结果顺序使用 concatMap 当:需要保证顺序请求之间有依赖需要限制并发数避免服务器过载性能考虑// 性能对比示例const source = interval(100).pipe(take(10));// switchMap: 只处理最后一个source.pipe( switchMap(x => of(x).pipe(delay(1000)))).subscribe();// mergeMap: 并行处理所有(可能造成性能问题)source.pipe( mergeMap(x => of(x).pipe(delay(1000)))).subscribe();// concatMap: 顺序处理(可能较慢)source.pipe( concatMap(x => of(x).pipe(delay(1000)))).subscribe();实际项目中的选择// 1. 搜索功能 - 使用 switchMapsearchInput.pipe( debounceTime(300), distinctUntilChanged(), switchMap(query => searchAPI(query))).subscribe(results => displayResults(results));// 2. 批量加载 - 使用 mergeMapproductIds.pipe( mergeMap(id => getProductDetails(id))).subscribe(products => renderProducts(products));// 3. 顺序操作 - 使用 concatMapcommands.pipe( concatMap(command => executeCommand(command))).subscribe(result => logResult(result));注意事项内存泄漏: mergeMap 可能创建大量并发请求,需要注意内存使用取消逻辑: switchMap 会自动取消,但 concatMap 和 mergeMap 不会错误处理: 任何一个内部 Observable 出错都会导致整个流失败取消订阅: 始终记得取消订阅以避免内存泄漏最佳实践// 1. 结合错误处理searchInput.pipe( switchMap(query => searchAPI(query).pipe( catchError(error => of([])) ) )).subscribe(results => displayResults(results));// 2. 限制并发数(使用 mergeMap)import { mergeMap } from 'rxjs/operators';source.pipe( mergeMap(value => process(value), 3) // 限制并发数为3).subscribe();// 3. 添加重试机制source.pipe( switchMap(value => apiCall(value).pipe( retry(3), catchError(error => of(defaultValue)) ) )).subscribe();
阅读 0·2026年2月21日 16:55

RxJS 中 Subject、BehaviorSubject、ReplaySubject 和 AsyncSubject 有什么区别?

Subject 的核心概念Subject 是 RxJS 中一种特殊的 Observable,它既是 Observable 又是 Observer。这意味着它可以:被多个观察者订阅主动推送新值给所有订阅者实现多播(multicast)功能Subject 类型1. Subject基础 Subject,每次有新订阅者时,不会回放之前的值const subject = new Subject();subject.next(1);subject.next(2);const subscription1 = subject.subscribe(value => console.log('订阅者1:', value));// 订阅者1: 3// 订阅者1: 4subject.next(3);subject.next(4);const subscription2 = subject.subscribe(value => console.log('订阅者2:', value));// 订阅者2: 5// 订阅者2: 6subject.next(5);subject.next(6);2. BehaviorSubjectBehaviorSubject 会记住最新的值,新订阅者会立即接收到当前值const behaviorSubject = new BehaviorSubject('初始值');behaviorSubject.subscribe(value => console.log('订阅者1:', value));// 订阅者1: 初始值behaviorSubject.next('值1');// 订阅者1: 值1behaviorSubject.subscribe(value => console.log('订阅者2:', value));// 订阅者2: 值1 - 立即收到最新值behaviorSubject.next('值2');// 订阅者1: 值2// 订阅者2: 值23. ReplaySubjectReplaySubject 可以回放指定数量的历史值const replaySubject = new ReplaySubject(2); // 回放最后2个值replaySubject.next('值1');replaySubject.next('值2');replaySubject.next('值3');replaySubject.subscribe(value => console.log('订阅者1:', value));// 订阅者1: 值2// 订阅者1: 值3replaySubject.next('值4');// 订阅者1: 值4replaySubject.subscribe(value => console.log('订阅者2:', value));// 订阅者2: 值3// 订阅者2: 值44. AsyncSubjectAsyncSubject 只在完成时发出最后一个值const asyncSubject = new AsyncSubject();asyncSubject.subscribe(value => console.log('订阅者1:', value));asyncSubject.next('值1');asyncSubject.next('值2');asyncSubject.next('值3');// 此时还没有输出asyncSubject.complete();// 订阅者1: 值3 - 只发出最后一个值asyncSubject.subscribe(value => console.log('订阅者2:', value));// 订阅者2: 值3 - 新订阅者也能收到最后一个值实际应用场景Subject 使用场景事件总线状态管理多个组件共享数据流BehaviorSubject 使用场景存储当前状态表单状态管理用户信息管理ReplaySubject 使用场景缓存历史数据重播操作日志错误重试机制AsyncSubject 使用场景HTTP 请求缓存只需要最终结果的计算异步操作的最后值与普通 Observable 的区别// 普通 Observable - 单播const observable = new Observable(subscriber => { subscriber.next(1); subscriber.next(2);});observable.subscribe(v => console.log('观察者1:', v));observable.subscribe(v => console.log('观察者2:', v));// 每个订阅者独立执行// Subject - 多播const subject = new Subject();const source = new Observable(subscriber => { subscriber.next(1); subscriber.next(2);});source.subscribe(subject);subject.subscribe(v => console.log('订阅者1:', v));subject.subscribe(v => console.log('订阅者2:', v));// 所有订阅者共享同一个数据流性能考虑Subject 在多播场景下可以提高性能,避免重复执行相同的异步操作。但需要注意内存泄漏问题,及时取消订阅。
阅读 0·2026年2月21日 16:55

RxJS 中如何进行性能优化?

RxJS 性能优化的核心原则RxJS 性能优化主要关注以下几个方面:减少不必要的订阅和取消订阅优化操作符链的执行效率合理使用多播和缓存避免内存泄漏减少计算开销优化策略1. 使用 share 和 shareReplay避免重复执行相同的 Observable。import { of } from 'rxjs';import { share, shareReplay } from 'rxjs/operators';// ❌ 不好的做法:每次订阅都重新执行const data$ = http.get('/api/data');data$.subscribe(data => console.log('Subscriber 1:', data));data$.subscribe(data => console.log('Subscriber 2:', data));// 发起两次 HTTP 请求// ✅ 好的做法:共享 Observableconst shared$ = http.get('/api/data').pipe( share() // 或 shareReplay(1));shared$.subscribe(data => console.log('Subscriber 1:', data));shared$.subscribe(data => console.log('Subscriber 2:', data));// 只发起一次 HTTP 请求2. 使用 debounceTime 和 throttleTime减少高频事件的处理频率。import { fromEvent } from 'rxjs';import { debounceTime, throttleTime } from 'rxjs/operators';// ❌ 不好的做法:处理每个滚动事件fromEvent(window, 'scroll').subscribe(event => { handleScroll(event); // 可能每秒触发数百次});// ✅ 好的做法:节流处理fromEvent(window, 'scroll').pipe( throttleTime(200) // 每 200ms 最多处理一次).subscribe(event => { handleScroll(event);});// ✅ 好的做法:防抖处理fromEvent(searchInput, 'input').pipe( debounceTime(300) // 停止输入 300ms 后才处理).subscribe(event => { search(event.target.value);});3. 使用 distinctUntilChanged避免处理重复的值。import { fromEvent } from 'rxjs';import { debounceTime, distinctUntilChanged } from 'rxjs/operators';// ❌ 不好的做法:可能处理相同的搜索词fromEvent(searchInput, 'input').pipe( debounceTime(300)).subscribe(event => { search(event.target.value);});// ✅ 好的做法:只在值变化时处理fromEvent(searchInput, 'input').pipe( debounceTime(300), distinctUntilChanged() // 避免重复搜索).subscribe(event => { search(event.target.value);});4. 使用 take 和 takeWhile及时取消不再需要的订阅。import { interval } from 'rxjs';import { take, takeWhile } from 'rxjs/operators';// ❌ 不好的做法:无限订阅interval(1000).subscribe(value => { console.log(value); // 需要手动取消订阅});// ✅ 好的做法:自动取消订阅interval(1000).pipe( take(10) // 只取 10 个值).subscribe(value => { console.log(value);});// ✅ 好的做法:条件取消interval(1000).pipe( takeWhile(value => value < 10)).subscribe(value => { console.log(value);});5. 使用 switchMap 而不是 mergeMap在需要取消旧操作的场景中。import { fromEvent } from 'rxjs';import { switchMap, mergeMap } from 'rxjs/operators';// ❌ 不好的做法:所有请求都会完成fromEvent(searchInput, 'input').pipe( debounceTime(300), mergeMap(query => searchAPI(query)) // 所有请求都会完成).subscribe(results => { displayResults(results);});// ✅ 好的做法:取消旧请求fromEvent(searchInput, 'input').pipe( debounceTime(300), switchMap(query => searchAPI(query)) // 取消旧请求).subscribe(results => { displayResults(results);});6. 使用 buffer 和 bufferTime批量处理数据,减少处理次数。import { interval } from 'rxjs';import { bufferTime } from 'rxjs/operators';// ❌ 不好的做法:逐个处理interval(100).pipe( take(100)).subscribe(value => { processItem(value); // 处理 100 次});// ✅ 好的做法:批量处理interval(100).pipe( take(100), bufferTime(1000) // 每秒批量处理).subscribe(buffer => { processBatch(buffer); // 只处理 10 次});7. 使用 filter 提前过滤尽早过滤掉不需要的数据。import { of } from 'rxjs';import { map, filter } from 'rxjs/operators';// ❌ 不好的做法:先处理再过滤of(1, 2, 3, 4, 5).pipe( map(x => { console.log('Processing:', x); // 处理所有值 return x * 2; }), filter(x => x > 5)).subscribe(value => { console.log('Result:', value);});// ✅ 好的做法:先过滤再处理of(1, 2, 3, 4, 5).pipe( filter(x => x > 2), // 先过滤 map(x => { console.log('Processing:', x); // 只处理需要的值 return x * 2; })).subscribe(value => { console.log('Result:', value);});8. 使用 finalize 清理资源确保资源被正确释放。import { interval } from 'rxjs';import { take, finalize } from 'rxjs/operators';// ❌ 不好的做法:可能忘记清理const subscription = interval(1000).pipe( take(10)).subscribe(value => { console.log(value);});// 可能忘记取消订阅// ✅ 好的做法:自动清理interval(1000).pipe( take(10), finalize(() => { console.log('Cleaning up...'); cleanupResources(); })).subscribe(value => { console.log(value);});高级优化技巧1. 使用 combineLatest 而不是嵌套订阅// ❌ 不好的做法:嵌套订阅this.userService.getUser(userId).subscribe(user => { this.postsService.getPosts(user.id).subscribe(posts => { this.commentsService.getComments(posts[0].id).subscribe(comments => { // 处理数据 }); });});// ✅ 好的做法:使用 combineLatestcombineLatest([ this.userService.getUser(userId), this.postsService.getPosts(userId), this.commentsService.getComments(postId)]).pipe( map(([user, posts, comments]) => ({ user, posts, comments }))).subscribe(data => { // 处理数据});2. 使用 mergeMap 限制并发import { of } from 'rxjs';import { mergeMap } from 'rxjs/operators';// ❌ 不好的做法:可能同时发起大量请求const ids = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];from(ids).pipe( mergeMap(id => fetchData(id)) // 可能同时发起 10 个请求).subscribe(result => { console.log(result);});// ✅ 好的做法:限制并发数from(ids).pipe( mergeMap(id => fetchData(id), 3) // 最多同时 3 个请求).subscribe(result => { console.log(result);});3. 使用 ReplaySubject 缓存数据import { ReplaySubject } from 'rxjs';// ❌ 不好的做法:每次都重新获取class DataService { getData() { return http.get('/api/data'); }}// ✅ 好的做法:缓存数据class DataService { private cache$ = new ReplaySubject(1); getData() { if (!this.hasFetched) { http.get('/api/data').subscribe(data => { this.cache$.next(data); this.hasFetched = true; }); } return this.cache$.asObservable(); }}4. 使用 scan 而不是 reducescan 可以持续发出中间结果,而 reduce 只在完成时发出结果。import { of } from 'rxjs';import { scan, reduce } from 'rxjs/operators';// ❌ 不好的做法:只在完成时得到结果of(1, 2, 3, 4, 5).pipe( reduce((sum, value) => sum + value, 0)).subscribe(sum => { console.log('Final sum:', sum); // 只输出一次});// ✅ 好的做法:持续输出中间结果of(1, 2, 3, 4, 5).pipe( scan((sum, value) => sum + value, 0)).subscribe(sum => { console.log('Current sum:', sum); // 输出 5 次});5. 使用 tap 进行调试import { of } from 'rxjs';import { map, filter, tap } from 'rxjs/operators';// ✅ 好的做法:使用 tap 调试of(1, 2, 3, 4, 5).pipe( tap(value => console.log('Input:', value)), filter(x => x > 2), tap(value => console.log('After filter:', value)), map(x => x * 2), tap(value => console.log('After map:', value))).subscribe(value => { console.log('Output:', value);});性能监控1. 使用 performance.now() 测量性能import { of } from 'rxjs';import { map, tap } from 'rxjs/operators';const startTime = performance.now();of(1, 2, 3, 4, 5).pipe( map(x => x * 2), tap(() => { const elapsed = performance.now() - startTime; console.log(`Elapsed: ${elapsed.toFixed(2)}ms`); })).subscribe();2. 使用 count 统计数据量import { of } from 'rxjs';import { count, tap } from 'rxjs/operators';of(1, 2, 3, 4, 5).pipe( tap(value => console.log('Processing:', value)), count()).subscribe(count => { console.log('Total processed:', count);});常见性能问题1. 过多的订阅// ❌ 不好的做法:创建多个订阅const data$ = http.get('/api/data');data$.subscribe(data => updateUI1(data));data$.subscribe(data => updateUI2(data));data$.subscribe(data => updateUI3(data));// ✅ 好的做法:共享订阅const data$ = http.get('/api/data').pipe( share());data$.subscribe(data => updateUI1(data));data$.subscribe(data => updateUI2(data));data$.subscribe(data => updateUI3(data));2. 内存泄漏// ❌ 不好的做法:忘记取消订阅class MyComponent { ngOnInit() { this.dataService.getData().subscribe(data => { this.data = data; }); } // 组件销毁时订阅仍然存在}// ✅ 好的做法:正确取消订阅class MyComponent implements OnDestroy { private destroy$ = new Subject<void>(); ngOnInit() { this.dataService.getData().pipe( takeUntil(this.destroy$) ).subscribe(data => { this.data = data; }); } ngOnDestroy() { this.destroy$.next(); this.destroy$.complete(); }}3. 不必要的计算// ❌ 不好的做法:重复计算of(1, 2, 3).pipe( map(x => { const result = expensiveCalculation(x); return result; }), map(x => { const result = expensiveCalculation(x); // 重复计算 return result; })).subscribe();// ✅ 好的做法:避免重复计算of(1, 2, 3).pipe( map(x => { const result = expensiveCalculation(x); return { value: x, result }; })).subscribe(data => { console.log(data.value, data.result);});最佳实践1. 使用 AsyncPipe@Component({ template: ` <div *ngIf="data$ | async as data"> {{ data }} </div> `})export class MyComponent { data$ = this.service.getData(); // AsyncPipe 自动管理订阅}2. 使用 takeUntilexport class MyComponent implements OnDestroy { private destroy$ = new Subject<void>(); ngOnInit() { this.service.getData().pipe( takeUntil(this.destroy$) ).subscribe(data => { this.data = data; }); } ngOnDestroy() { this.destroy$.next(); this.destroy$.complete(); }}3. 使用 shareReplay@Injectable()export class DataService { private cache = new Map<string, Observable<any>>(); getData(key: string) { if (!this.cache.has(key)) { this.cache.set(key, http.get(`/api/data/${key}`).pipe( shareReplay(1) )); } return this.cache.get(key)!; }}总结RxJS 性能优化的关键点:共享订阅: 使用 share 和 shareReplay 避免重复执行减少处理频率: 使用 debounceTime 和 throttleTime过滤重复数据: 使用 distinctUntilChanged及时取消订阅: 使用 take 和 takeWhile选择合适的操作符: switchMap vs mergeMap vs concatMap批量处理: 使用 buffer 和 bufferTime提前过滤: 使用 filter 尽早过滤不需要的数据清理资源: 使用 finalize 确保资源释放避免嵌套订阅: 使用 combineLatest 和 forkJoin限制并发: 使用 mergeMap 的并发参数掌握这些优化技巧可以显著提升 RxJS 应用的性能。
阅读 0·2026年2月21日 16:54

RxJS 在 Angular 中如何应用?

RxJS 在 Angular 中的应用RxJS 是 Angular 框架的核心依赖,广泛应用于异步操作、事件处理和数据流管理。核心应用场景1. HTTP 请求Angular 的 HttpClient 返回 Observable,便于处理异步请求。import { HttpClient } from '@angular/common/http';import { Observable } from 'rxjs';@Injectable({ providedIn: 'root'})export class DataService { constructor(private http: HttpClient) {} getUsers(): Observable<User[]> { return this.http.get<User[]>('/api/users'); } getUserById(id: number): Observable<User> { return this.http.get<User>(`/api/users/${id}`); } createUser(user: User): Observable<User> { return this.http.post<User>('/api/users', user); } updateUser(id: number, user: User): Observable<User> { return this.http.put<User>(`/api/users/${id}`, user); } deleteUser(id: number): Observable<void> { return this.http.delete<void>(`/api/users/${id}`); }}在组件中使用:import { Component, OnInit } from '@angular/core';import { DataService } from './data.service';@Component({ selector: 'app-user-list', template: ` <div *ngIf="users$ | async as users"> <div *ngFor="let user of users"> {{ user.name }} </div> </div> `})export class UserListComponent implements OnInit { users$: Observable<User[]>; constructor(private dataService: DataService) {} ngOnInit() { this.users$ = this.dataService.getUsers(); }}2. 表单处理Angular 的响应式表单与 RxJS 完美集成。import { Component, OnInit } from '@angular/core';import { FormBuilder, FormGroup, Validators } from '@angular/forms';@Component({ selector: 'app-search-form', template: ` <form [formGroup]="searchForm"> <input formControlName="search" placeholder="Search..."> </form> `})export class SearchFormComponent implements OnInit { searchForm: FormGroup; constructor(private fb: FormBuilder) { this.searchForm = this.fb.group({ search: ['', Validators.minLength(3)] }); } ngOnInit() { // 监听搜索输入 this.searchForm.get('search')?.valueChanges.pipe( debounceTime(300), distinctUntilChanged(), filter(query => query.length >= 3), switchMap(query => this.search(query)) ).subscribe(results => { this.displayResults(results); }); } search(query: string): Observable<SearchResult[]> { return this.http.get<SearchResult[]>(`/api/search?q=${query}`); } displayResults(results: SearchResult[]) { // 显示搜索结果 }}3. 路由处理使用 RxJS 处理路由参数和查询参数。import { Component, OnInit } from '@angular/core';import { ActivatedRoute, Router } from '@angular/router';@Component({ selector: 'app-user-detail', template: ` <div *ngIf="user$ | async as user"> <h1>{{ user.name }}</h1> <p>{{ user.email }}</p> </div> `})export class UserDetailComponent implements OnInit { user$: Observable<User>; constructor( private route: ActivatedRoute, private router: Router, private dataService: DataService ) {} ngOnInit() { // 监听路由参数变化 this.user$ = this.route.paramMap.pipe( switchMap(params => { const id = Number(params.get('id')); return this.dataService.getUserById(id); }) ); } navigateToUser(id: number) { this.router.navigate(['/users', id]); }}4. 状态管理使用 BehaviorSubject 或 NgRx 进行状态管理。简单状态管理:import { Injectable } from '@angular/core';import { BehaviorSubject, Observable } from 'rxjs';@Injectable({ providedIn: 'root'})export class StateService { private state$ = new BehaviorSubject<AppState>({ user: null, isLoading: false, error: null }); getState(): Observable<AppState> { return this.state$.asObservable(); } updateUser(user: User) { const currentState = this.state$.value; this.state$.next({ ...currentState, user }); } setLoading(loading: boolean) { const currentState = this.state$.value; this.state$.next({ ...currentState, isLoading: loading }); }}在组件中使用:@Component({ selector: 'app-app', template: ` <div *ngIf="state$ | async as state"> <div *ngIf="state.isLoading">Loading...</div> <div *ngIf="state.user">Welcome, {{ state.user.name }}</div> </div> `})export class AppComponent { state$: Observable<AppState>; constructor(private stateService: StateService) { this.state$ = this.stateService.getState(); }}高级应用模式1. 使用 AsyncPipeAsyncPipe 自动管理订阅和取消订阅。@Component({ selector: 'app-user-list', template: ` <div *ngIf="users$ | async as users"> <div *ngFor="let user of users"> {{ user.name }} </div> </div> `})export class UserListComponent { users$: Observable<User[]>; constructor(private dataService: DataService) { this.users$ = this.dataService.getUsers(); }}2. 使用 takeUntil 防止内存泄漏import { Component, OnInit, OnDestroy } from '@angular/core';import { Subject } from 'rxjs';import { takeUntil } from 'rxjs/operators';@Component({ selector: 'app-component', template: `...`})export class MyComponent implements OnInit, OnDestroy { private destroy$ = new Subject<void>(); ngOnInit() { this.dataService.getUsers().pipe( takeUntil(this.destroy$) ).subscribe(users => { this.users = users; }); } ngOnDestroy() { this.destroy$.next(); this.destroy$.complete(); }}3. 使用 shareReplay 缓存数据import { Injectable } from '@angular/core';import { Observable, shareReplay } from 'rxjs';@Injectable({ providedIn: 'root'})export class CacheService { private cache = new Map<string, Observable<any>>(); get<T>(key: string, fetchFn: () => Observable<T>): Observable<T> { if (!this.cache.has(key)) { this.cache.set(key, fetchFn().pipe( shareReplay(1) )); } return this.cache.get(key) as Observable<T>; } clear() { this.cache.clear(); }}4. 使用 combineLatest 组合多个数据源@Component({ selector: 'app-dashboard', template: ` <div *ngIf="dashboardData$ | async as data"> <h2>Users: {{ data.users.length }}</h2> <h2>Posts: {{ data.posts.length }}</h2> <h2>Comments: {{ data.comments.length }}</h2> </div> `})export class DashboardComponent { dashboardData$: Observable<DashboardData>; constructor(private dataService: DataService) { this.dashboardData$ = combineLatest([ this.dataService.getUsers(), this.dataService.getPosts(), this.dataService.getComments() ]).pipe( map(([users, posts, comments]) => ({ users, posts, comments })) ); }}常见问题和解决方案1. 处理错误this.dataService.getUsers().pipe( catchError(error => { console.error('Failed to load users:', error); return of([]); // 返回空数组作为降级 })).subscribe(users => { this.users = users;});2. 重试失败的请求this.dataService.getUsers().pipe( retry(3), // 重试 3 次 catchError(error => { console.error('Failed after retries:', error); return of([]); })).subscribe(users => { this.users = users;});3. 加载状态管理@Component({ selector: 'app-user-list', template: ` <div *ngIf="isLoading">Loading...</div> <div *ngIf="users$ | async as users"> <div *ngFor="let user of users"> {{ user.name }} </div> </div> `})export class UserListComponent { isLoading = false; users$: Observable<User[]>; constructor(private dataService: DataService) {} loadUsers() { this.isLoading = true; this.dataService.getUsers().pipe( finalize(() => { this.isLoading = false; }) ).subscribe(users => { this.users = users; }); }}4. 搜索防抖@Component({ selector: 'app-search', template: ` <input #searchInput (input)="onSearch($event)" placeholder="Search..."> <div *ngIf="results$ | async as results"> <div *ngFor="let result of results"> {{ result.name }} </div> </div> `})export class SearchComponent { results$: Observable<SearchResult[]>; constructor(private dataService: DataService) {} onSearch(event: Event) { const query = (event.target as HTMLInputElement).value; this.results$ = of(query).pipe( debounceTime(300), distinctUntilChanged(), switchMap(q => this.dataService.search(q)) ); }}最佳实践1. 使用 AsyncPipe// ✅ 推荐@Component({ template: `<div *ngIf="data$ | async as data">{{ data }}</div>`})export class MyComponent { data$ = this.service.getData();}// ❌ 不推荐@Component({ template: `<div>{{ data }}</div>`})export class MyComponent implements OnInit, OnDestroy { data: any; private subscription: Subscription; ngOnInit() { this.subscription = this.service.getData().subscribe(data => { this.data = data; }); } ngOnDestroy() { this.subscription.unsubscribe(); }}2. 防止内存泄漏// ✅ 推荐export class MyComponent implements OnDestroy { private destroy$ = new Subject<void>(); ngOnInit() { this.service.getData().pipe( takeUntil(this.destroy$) ).subscribe(data => { this.data = data; }); } ngOnDestroy() { this.destroy$.next(); this.destroy$.complete(); }}// ❌ 不推荐export class MyComponent { ngOnInit() { this.service.getData().subscribe(data => { this.data = data; }); // 忘记取消订阅 }}3. 错误处理// ✅ 推荐this.service.getData().pipe( catchError(error => { console.error('Error:', error); return of(defaultData); })).subscribe(data => { this.data = data;});// ❌ 不推荐this.service.getData().subscribe({ next: data => { this.data = data; }, error: error => { console.error('Error:', error); // 没有降级处理 }});4. 类型安全// ✅ 推荐interface User { id: number; name: string; email: string;}this.http.get<User[]>('/api/users').subscribe(users => { users.forEach(user => { console.log(user.name); // 类型安全 });});// ❌ 不推荐this.http.get('/api/users').subscribe((users: any) => { users.forEach((user: any) => { console.log(user.name); // 没有类型检查 });});总结RxJS 在 Angular 中的关键应用:HTTP 请求: 使用 HttpClient 处理异步请求表单处理: 监听表单值变化,实现防抖和验证路由处理: 监听路由参数和查询参数变化状态管理: 使用 BehaviorSubject 或 NgRx 管理应用状态AsyncPipe: 自动管理订阅,防止内存泄漏错误处理: 使用 catchError 和 retry 处理错误性能优化: 使用 debounceTime、shareReplay 等优化性能掌握 RxJS 在 Angular 中的应用是成为 Angular 高级开发者的关键。
阅读 0·2026年2月21日 16:54

RxJS 6 和 RxJS 7 有什么区别?

RxJS 6 与 RxJS 7 的主要变化1. 导入方式的变化RxJS 6:// 创建操作符import { of, from, interval } from 'rxjs';// 操作符(使用 pipe)import { map, filter, switchMap } from 'rxjs/operators';// 工具函数import { Subscription } from 'rxjs';RxJS 7:// 导入方式基本相同,但更加模块化import { of, from, interval } from 'rxjs';import { map, filter, switchMap } from 'rxjs/operators';// 新增了一些操作符import { debounceTime, distinctUntilChanged } from 'rxjs/operators';2. 新增的操作符RxJS 7 新增了多个有用的操作符:1. partition根据谓词函数将 Observable 分成两个。import { of, partition } from 'rxjs';const source$ = of(1, 2, 3, 4, 5, 6);const [evens$, odds$] = partition(source$, x => x % 2 === 0);evens$.subscribe(x => console.log('Even:', x)); // 2, 4, 6odds$.subscribe(x => console.log('Odd:', x)); // 1, 3, 52. tap 的改进tap 现在接受一个对象,可以分别处理不同的通知类型。import { of } from 'rxjs';import { tap } from 'rxjs/operators';of(1, 2, 3).pipe( tap({ subscribe: () => console.log('Subscribed'), next: value => console.log('Next:', value), error: error => console.log('Error:', error), complete: () => console.log('Completed'), unsubscribe: () => console.log('Unsubscribed') })).subscribe();3. connectable 操作符简化了多播 Observable 的创建。import { interval, connectable } from 'rxjs';import { take } from 'rxjs/operators';const source$ = interval(1000).pipe(take(5));const connectable$ = connectable(source$);connectable$.subscribe(value => console.log('Subscriber 1:', value));connectable$.subscribe(value => console.log('Subscriber 2:', value));connectable$.connect();4. shareReplay 的改进shareReplay 现在支持配置对象。import { interval } from 'rxjs';import { shareReplay, take } from 'rxjs/operators';const shared$ = interval(1000).pipe( take(5), shareReplay({ bufferSize: 2, refCount: true }));5. filter 的类型推断改进更好的 TypeScript 类型推断。import { of } from 'rxjs';import { filter } from 'rxjs/operators';const source$ = of(1, 2, 3, 4, 5);// RxJS 7 中类型推断更准确const even$ = source$.pipe( filter(x => x % 2 === 0) // x 被推断为 number);even$.subscribe(x => console.log(x)); // x 是 number 类型3. 废弃的操作符以下操作符在 RxJS 7 中被废弃:1. throwErrorRxJS 6:import { throwError } from 'rxjs';throwError('Error message');RxJS 7:import { throwError } from 'rxjs';// 推荐使用工厂函数throwError(() => new Error('Error message'));2. concat 和 merge 的静态方法虽然仍然可用,但推荐使用 concatWith 和 mergeWith。import { of, concat, merge } from 'rxjs';// 仍然可用concat(of(1), of(2)).subscribe();merge(of(1), of(2)).subscribe();// 推荐使用of(1).pipe(concatWith(of(2))).subscribe();of(1).pipe(mergeWith(of(2))).subscribe();4. 性能优化1. 更小的包体积RxJS 7 通过树摇优化减少了包体积。// RxJS 7 只导入需要的操作符import { of } from 'rxjs';import { map } from 'rxjs/operators';// 打包时只会包含 of 和 map2. 更快的执行速度某些操作符的执行速度得到了优化。import { of } from 'rxjs';import { map, filter } from 'rxjs/operators';// RxJS 7 中这些操作符执行更快of(1, 2, 3, 4, 5).pipe( map(x => x * 2), filter(x => x > 5)).subscribe();5. TypeScript 改进1. 更好的类型推断import { of } from 'rxjs';import { map, filter } from 'rxjs/operators';// RxJS 7 中类型推断更准确const result$ = of(1, 2, 3).pipe( map(x => x.toString()), // 推断为 Observable<string> filter(x => x.length > 0));2. 严格类型检查import { of } from 'rxjs';import { map } from 'rxjs/operators';// RxJS 7 中类型检查更严格of(1, 2, 3).pipe( map(x => x.toUpperCase()) // TypeScript 错误:number 没有 toUpperCase);6. 错误处理改进1. 更好的错误信息import { of } from 'rxjs';import { map } from 'rxjs/operators';of(1, 2, 3).pipe( map(x => { if (x === 2) throw new Error('Custom error'); return x; })).subscribe({ error: error => { console.log(error.message); // 更清晰的错误信息 }});2. onErrorResumeNext 的改进import { of, onErrorResumeNext } from 'rxjs';const source1$ = of(1, 2, 3).pipe( map(x => { if (x === 2) throw new Error('Error'); return x; }));const source2$ = of(4, 5, 6);onErrorResumeNext(source1$, source2$).subscribe(console.log);// 输出: 1, 4, 5, 67. 调度器改进1. animationFrameScheduler 的改进import { interval, animationFrameScheduler } from 'rxjs';import { take } from 'rxjs/operators';// RxJS 7 中 animationFrameScheduler 更稳定interval(0, animationFrameScheduler).pipe( take(60)).subscribe(frame => { console.log('Frame:', frame);});2. asapScheduler 的改进import { of, asapScheduler } from 'rxjs';// RxJS 7 中 asapScheduler 性能更好of(1, 2, 3, asapScheduler).subscribe(value => { console.log(value);});8. 测试工具改进1. TestScheduler 的改进import { TestScheduler } from 'rxjs/testing';// RxJS 7 中 TestScheduler 更易用const testScheduler = new TestScheduler((actual, expected) => { expect(actual).toEqual(expected);});testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c|'); const expected = '-a-b-c|'; expectObservable(source$).toBe(expected);});2. marble testing 的改进import { TestScheduler } from 'rxjs/testing';// RxJS 7 中 marble testing 更直观testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => { const source$ = cold('-a-b-c|'); const expected = '-a-b-c|'; expectObservable(source$).toBe(expected);});9. 文档和示例改进1. 更好的文档RxJS 7 提供了更详细的文档和示例。// 官方文档提供了更多实用的示例import { of } from 'rxjs';import { map, filter } from 'rxjs/operators';of(1, 2, 3, 4, 5).pipe( map(x => x * 2), filter(x => x > 5)).subscribe(console.log);2. 更多的最佳实践官方文档中包含了更多的最佳实践建议。// 推荐使用 pipe 而不是链式调用import { of } from 'rxjs';import { map, filter } from 'rxjs/operators';// ✅ 推荐of(1, 2, 3).pipe( map(x => x * 2), filter(x => x > 5)).subscribe();// ❌ 不推荐of(1, 2, 3).pipe( map(x => x * 2)).pipe( filter(x => x > 5)).subscribe();10. 迁移指南1. 从 RxJS 6 迁移到 RxJS 7# 升级 RxJSnpm install rxjs@7# 检查废弃的 APInpm run lint2. 常见迁移问题问题 1: throwError 的使用// ❌ RxJS 6throwError('Error message');// ✅ RxJS 7throwError(() => new Error('Error message'));问题 2: 类型推断问题// 可能需要显式类型import { of } from 'rxjs';import { map } from 'rxjs/operators';const result$ = of(1, 2, 3).pipe( map(x => x.toString())) as Observable<string>;总结RxJS 7 相比 RxJS 6 的主要改进:新增操作符: partition、connectable 等性能优化: 更小的包体积,更快的执行速度TypeScript 改进: 更好的类型推断和严格类型检查错误处理: 更好的错误信息和处理机制调度器改进: 更稳定和高效的调度器测试工具: 更易用的 TestScheduler文档改进: 更详细的文档和示例迁移到 RxJS 7 可以带来更好的性能和开发体验,但需要注意一些 API 的变化。
阅读 0·2026年2月21日 16:54

RxJS 中的 Observable 和 Promise 有什么区别?

核心区别Observable 和 Promise 都是处理异步操作的工具,但它们在设计理念和使用方式上有显著差异:1. 执行时机Promise: 一旦创建就会立即执行,无法取消const promise = new Promise((resolve) => { console.log('Promise 立即执行'); resolve('done');});Observable: 只有订阅时才会执行,可以取消订阅const observable = new Observable((observer) => { console.log('Observable 订阅时执行'); observer.next('data');});observable.subscribe();2. 数据流Promise: 只能发出一个值,然后完成或失败promise.then(value => console.log(value)); // 只接收一个值Observable: 可以发出多个值,随时间推移持续推送数据observable.subscribe(value => console.log(value)); // 可以接收多个值3. 可取消性Promise: 无法取消,一旦创建就会执行到底const promise = fetch('/api/data');// 无法中途取消这个请求Observable: 可以通过 unsubscribe() 取消订阅const subscription = observable.subscribe();subscription.unsubscribe(); // 取消订阅4. 操作符支持Promise: 只有链式调用 then/catch/finallypromise .then(data => processData(data)) .then(result => console.log(result)) .catch(error => handleError(error));Observable: 丰富的操作符生态系统observable .pipe( map(data => processData(data)), filter(result => result.isValid), catchError(error => handleError(error)) ) .subscribe();5. 懒加载 vs 急切加载Promise: 急切执行,创建时就开始工作Observable: 懒加载,直到订阅才开始执行6. 同步/异步Promise: 总是异步的Observable: 可以是同步或异步的实际应用场景使用 Promise 的场景单次异步操作不需要取消只需要一个结果简单的异步流程使用 Observable 的场景需要处理多个值需要取消操作复杂的数据流处理事件处理WebSocket 连接实时数据流性能考虑Observable 在处理复杂异步流程时提供了更好的性能和灵活性,特别是在需要处理多个异步操作或需要取消操作的场景中。Promise 更适合简单的异步操作,代码更简洁直观。
阅读 0·2026年2月21日 16:28

RxJS 中常用的操作符有哪些?如何使用?

常用操作符分类1. 创建操作符 (Creation Operators)of创建一个发出指定值的 Observableimport { of } from 'rxjs';of(1, 2, 3).subscribe(console.log);// 输出: 1, 2, 3from将数组、Promise、Iterable 等转换为 Observableimport { from } from 'rxjs';from([1, 2, 3]).subscribe(console.log);// 输出: 1, 2, 3from(Promise.resolve('Hello')).subscribe(console.log);// 输出: Hellointerval / timer创建定时发出的 Observableimport { interval, timer } from 'rxjs';interval(1000).subscribe(console.log);// 每秒发出一个递增数字: 0, 1, 2, 3...timer(2000, 1000).subscribe(console.log);// 2秒后开始,每秒发出一个数字: 0, 1, 2, 3...2. 转换操作符 (Transformation Operators)map转换每个发出的值import { of } from 'rxjs';import { map } from 'rxjs/operators';of(1, 2, 3).pipe( map(x => x * 2)).subscribe(console.log);// 输出: 2, 4, 6switchMap取消之前的内部 Observable,只处理最新的import { fromEvent } from 'rxjs';import { switchMap } from 'rxjs/operators';fromEvent(document, 'click').pipe( switchMap(() => fetch('/api/data'))).subscribe(response => console.log(response));mergeMap并行处理所有内部 Observableimport { of } from 'rxjs';import { mergeMap } from 'rxjs/operators';of(1, 2, 3).pipe( mergeMap(x => of(x, x * 2))).subscribe(console.log);// 输出: 1, 2, 2, 4, 3, 6concatMap顺序处理内部 Observable,一个完成后再处理下一个import { of } from 'rxjs';import { concatMap } from 'rxjs/operators';of(1, 2, 3).pipe( concatMap(x => of(x, x * 2))).subscribe(console.log);// 输出: 1, 2, 2, 4, 3, 6 (顺序执行)3. 过滤操作符 (Filtering Operators)filter过滤符合条件的值import { of } from 'rxjs';import { filter } from 'rxjs/operators';of(1, 2, 3, 4, 5).pipe( filter(x => x % 2 === 0)).subscribe(console.log);// 输出: 2, 4take / takeLast / takeUntil只取前几个、后几个或直到某个条件import { interval, fromEvent } from 'rxjs';import { take, takeUntil } from 'rxjs/operators';interval(1000).pipe( take(3)).subscribe(console.log);// 输出: 0, 1, 2interval(1000).pipe( takeUntil(fromEvent(document, 'click'))).subscribe(console.log);// 点击时停止debounceTime在指定时间内只发出最后一个值import { fromEvent } from 'rxjs';import { debounceTime } from 'rxjs/operators';fromEvent(inputElement, 'input').pipe( debounceTime(300)).subscribe(event => console.log(event.target.value));throttleTime在指定时间内只发出第一个值import { fromEvent } from 'rxjs';import { throttleTime } from 'rxjs/operators';fromEvent(window, 'scroll').pipe( throttleTime(200)).subscribe(event => console.log('Scroll event'));4. 组合操作符 (Combination Operators)merge合并多个 Observable,并行发出值import { merge, interval } from 'rxjs';merge( interval(1000).pipe(map(x => `A${x}`)), interval(1500).pipe(map(x => `B${x}`))).subscribe(console.log);concat顺序连接多个 Observableimport { concat, of } from 'rxjs';concat( of(1, 2), of(3, 4), of(5, 6)).subscribe(console.log);// 输出: 1, 2, 3, 4, 5, 6combineLatest组合多个 Observable 的最新值import { combineLatest, of } from 'rxjs';combineLatest([ of(1, 2, 3), of('a', 'b', 'c')]).subscribe(([num, char]) => console.log(num, char));// 输出: [3, 'a'], [3, 'b'], [3, 'c']zip按索引组合多个 Observable 的值import { zip, of } from 'rxjs';zip( of(1, 2, 3), of('a', 'b', 'c')).subscribe(([num, char]) => console.log(num, char));// 输出: [1, 'a'], [2, 'b'], [3, 'c']5. 错误处理操作符 (Error Handling Operators)catchError捕获错误并返回新的 Observableimport { of } from 'rxjs';import { map, catchError } from 'rxjs/operators';of(1, 2, 3, 4).pipe( map(x => { if (x === 3) throw new Error('Error!'); return x; }), catchError(error => of('default value'))).subscribe(console.log);// 输出: 1, 2, 'default value'retry重试失败的 Observableimport { of } from 'rxjs';import { map, retry } from 'rxjs/operators';let count = 0;of(1, 2, 3).pipe( map(x => { count++; if (count < 3) throw new Error('Error!'); return x; }), retry(2)).subscribe(console.log);6. 工具操作符 (Utility Operators)tap执行副作用,不修改值import { of } from 'rxjs';import { tap, map } from 'rxjs/operators';of(1, 2, 3).pipe( tap(x => console.log('Before:', x)), map(x => x * 2), tap(x => console.log('After:', x))).subscribe();delay延迟发出值import { of } from 'rxjs';import { delay } from 'rxjs/operators';of(1, 2, 3).pipe( delay(1000)).subscribe(console.log);// 1秒后输出: 1, 2, 3实际应用示例搜索框防抖fromEvent(searchInput, 'input').pipe( debounceTime(300), map(event => event.target.value), filter(query => query.length > 2), switchMap(query => searchAPI(query))).subscribe(results => displayResults(results));自动保存form.valueChanges.pipe( debounceTime(1000), distinctUntilChanged(), switchMap(formData => saveAPI(formData))).subscribe();并行请求merge( getUserData(userId), getUserPosts(userId), getUserComments(userId)).pipe( combineLatestAll()).subscribe(([user, posts, comments]) => { renderUserProfile(user, posts, comments);});最佳实践合理使用 pipe() 链式调用注意操作符的执行顺序及时取消订阅避免内存泄漏根据场景选择合适的操作符使用 TypeScript 获得更好的类型推断
阅读 0·2026年2月21日 16:28

如何在 RxJS 中防止内存泄漏?

内存泄漏的原因在 RxJS 中,内存泄漏主要发生在以下几种情况:1. 未取消订阅最常见的内存泄漏原因是订阅了 Observable 但没有取消订阅。// ❌ 错误示例:内存泄漏class MyComponent { constructor() { this.data$ = http.get('/api/data').subscribe(data => { console.log(data); }); }}// 组件销毁时,订阅仍然存在,导致内存泄漏2. 长期运行的 Observableinterval、fromEvent 等会持续发出值的 Observable,如果不取消订阅会持续占用内存。// ❌ 错误示例setInterval(() => { console.log('Running...');}, 1000);// ✅ 正确示例const subscription = interval(1000).subscribe();subscription.unsubscribe();3. 闭包引用订阅回调中引用了外部变量,导致这些变量无法被垃圾回收。// ❌ 错误示例function createSubscription() { const largeData = new Array(1000000).fill('data'); return interval(1000).subscribe(() => { console.log(largeData.length); // largeData 被闭包引用 });}const sub = createSubscription();// 即使 sub 不再使用,largeData 也不会被释放4. 事件监听器未移除使用 fromEvent 创建的订阅如果不取消,事件监听器会一直存在。// ❌ 错误示例fromEvent(document, 'click').subscribe(event => { console.log('Clicked');});// 事件监听器永远不会被移除防止内存泄漏的方法1. 手动取消订阅最直接的方法是在适当的时候调用 unsubscribe()。class MyComponent { private subscriptions: Subscription[] = []; ngOnInit() { const sub1 = http.get('/api/data').subscribe(data => { this.data = data; }); const sub2 = interval(1000).subscribe(() => { this.update(); }); this.subscriptions.push(sub1, sub2); } ngOnDestroy() { this.subscriptions.forEach(sub => sub.unsubscribe()); }}2. 使用 takeUntiltakeUntil 是最常用的取消订阅方式之一。import { Subject, takeUntil } from 'rxjs';class MyComponent { private destroy$ = new Subject<void>(); ngOnInit() { http.get('/api/data').pipe( takeUntil(this.destroy$) ).subscribe(data => { this.data = data; }); interval(1000).pipe( takeUntil(this.destroy$) ).subscribe(() => { this.update(); }); } ngOnDestroy() { this.destroy$.next(); this.destroy$.complete(); }}3. 使用 take、takeWhile、takeLast根据条件自动取消订阅。// take: 只取前 N 个值interval(1000).pipe( take(5)).subscribe(value => console.log(value));// 输出: 0, 1, 2, 3, 4 然后自动取消订阅// takeWhile: 满足条件时继续订阅interval(1000).pipe( takeWhile(value => value < 5)).subscribe(value => console.log(value));// 输出: 0, 1, 2, 3, 4 然后自动取消订阅// takeLast: 只取最后 N 个值of(1, 2, 3, 4, 5).pipe( takeLast(2)).subscribe(value => console.log(value));// 输出: 4, 54. 使用 first只取第一个值,然后自动取消订阅。http.get('/api/data').pipe( first()).subscribe(data => { console.log(data);});// 只发出第一个值就完成5. 使用 AsyncPipe(Angular)在 Angular 中,AsyncPipe 会自动管理订阅。@Component({ template: ` <div *ngIf="data$ | async as data"> {{ data }} </div> `})export class MyComponent { data$ = http.get('/api/data'); // AsyncPipe 会自动取消订阅}6. 使用 finalize在取消订阅时执行清理操作。http.get('/api/data').pipe( finalize(() => { console.log('Cleaning up...'); // 执行清理操作 })).subscribe(data => { console.log(data);});最佳实践1. 组件级别的订阅管理import { Component, OnDestroy } from '@angular/core';import { Subject, takeUntil } from 'rxjs';@Component({ selector: 'app-my', template: '...'})export class MyComponent implements OnDestroy { private destroy$ = new Subject<void>(); constructor() { this.setupSubscriptions(); } private setupSubscriptions() { // 所有订阅都使用 takeUntil this.http.get('/api/user').pipe( takeUntil(this.destroy$) ).subscribe(user => { this.user = user; }); this.route.params.pipe( takeUntil(this.destroy$) ).subscribe(params => { this.loadPage(params.id); }); } ngOnDestroy() { this.destroy$.next(); this.destroy$.complete(); }}2. 创建可重用的取消订阅工具import { Subject, Observable } from 'rxjs';import { takeUntil } from 'rxjs/operators';export class AutoUnsubscribe { private destroy$ = new Subject<void>(); protected autoUnsubscribe<T>(observable: Observable<T>): Observable<T> { return observable.pipe(takeUntil(this.destroy$)); } ngOnDestroy() { this.destroy$.next(); this.destroy$.complete(); }}// 使用class MyComponent extends AutoUnsubscribe { ngOnInit() { this.autoUnsubscribe(http.get('/api/data')).subscribe(data => { console.log(data); }); }}3. 使用 Subscription 集合import { Subscription } from 'rxjs';class MyService { private subscriptions = new Subscription(); startMonitoring() { const sub1 = interval(1000).subscribe(); const sub2 = fromEvent(document, 'click').subscribe(); this.subscriptions.add(sub1); this.subscriptions.add(sub2); } stopMonitoring() { this.subscriptions.unsubscribe(); }}4. 避免在回调中创建订阅// ❌ 错误示例interval(1000).subscribe(() => { http.get('/api/data').subscribe(data => { console.log(data); }); // 每次都创建新订阅,无法取消});// ✅ 正确示例interval(1000).pipe( switchMap(() => http.get('/api/data'))).subscribe(data => { console.log(data);});// switchMap 会自动取消之前的订阅检测内存泄漏1. 使用 Chrome DevTools// 在组件中添加标记class MyComponent { private id = Math.random(); ngOnDestroy() { console.log(`Component ${this.id} destroyed`); }}// 观察控制台,确认组件销毁时是否真的清理了订阅2. 使用 RxJS 调试工具import { tap } from 'rxjs/operators';http.get('/api/data').pipe( tap({ subscribe: () => console.log('Subscribed'), unsubscribe: () => console.log('Unsubscribed'), next: value => console.log('Next:', value), complete: () => console.log('Completed'), error: error => console.log('Error:', error) })).subscribe();常见陷阱1. 忘记取消嵌套订阅// ❌ 错误示例http.get('/api/user').subscribe(user => { http.get(`/api/posts/${user.id}`).subscribe(posts => { console.log(posts); }); // 内层订阅没有被管理});// ✅ 正确示例http.get('/api/user').pipe( switchMap(user => http.get(`/api/posts/${user.id}`))).subscribe(posts => { console.log(posts);});2. 在服务中创建订阅// ❌ 错误示例@Injectable()export class DataService { constructor(private http: HttpClient) { this.http.get('/api/data').subscribe(data => { this.data = data; }); // 服务中的订阅很难取消 }}// ✅ 正确示例@Injectable()export class DataService { private data$ = this.http.get('/api/data'); getData() { return this.data$; }}3. 忽略错误处理// ❌ 错误示例http.get('/api/data').subscribe(data => { console.log(data);});// 错误没有被处理,可能导致订阅无法正常完成// ✅ 正确示例http.get('/api/data').pipe( catchError(error => { console.error(error); return of([]); })).subscribe(data => { console.log(data);});总结防止 RxJS 内存泄漏的关键是:始终取消订阅:特别是对于长期运行的 Observable使用 takeUntil:这是最推荐的取消订阅方式避免嵌套订阅:使用 switchMap、concatMap 等操作符使用 AsyncPipe:在 Angular 中优先使用 AsyncPipe定期检查:使用 DevTools 检测内存泄漏错误处理:确保错误被正确处理,避免订阅卡住遵循这些最佳实践,可以有效地防止 RxJS 应用中的内存泄漏问题。
阅读 0·2026年2月21日 16:28