标签

Rxjs

一个 JavaScript 库,使用可观察量进行反应式编程,处理异步数据调用、回调和基于事件的程序。

Rxjs
服务端5月28日 08:26
RxJS Marble Testing 怎么写?弹珠测试核心用法与面试要点## 什么是 Marble Testing RxJS 的异步数据流测试一直是前端开发中的难点——回调嵌套、定时器模拟、异步断言让测试代码既冗长又脆弱。Marble Testing 是 RxJS 官方提供的一种解决方案:用简短的字符串(称为 marble 弹珠字符串)可视化地描述 Observable 的时间线和事件,再由 TestScheduler 在虚拟时间中同步执行,把原本需要等待真实异步的测试变成瞬时可验证的同步断言。 一句话概括:**Marble Testing = 弹珠字符串 + TestScheduler = 用可视化语法写同步的异步测试**。 ## Marble 语法速查 ### 核心符号 | 符号 | 含义 | 示例 | |------|------|------| | `-` | 时间流逝(1 帧,约 10ms) | `---` 表示 30ms | | `a-z` | 发出的值 | `-a-b-` 发出 a 和 b | | `\|` | 完成 | `-a-b-\|` 发出后完成 | | `#` | 错误 | `-a-#` 发出 a 后抛错 | | `()` | 同步分组 | `(abc\|)` 同步发出 a、b、c 后完成 | | `^` | 订阅点(hot Observable) | `^-a-b-` 从订阅点开始接收 | | `!` | 取消订阅 | `^-a-!` 订阅后收到 a 就取消 | ### 常见 marble 字符串解读 ```typescript // 冷 Observable:从订阅时开始 cold('-a-b-c-|') // → 10ms 发出 a,20ms 发出 b,30ms 发出 c,40ms 完成 cold('-a-b-#') // → 10ms 发出 a,20ms 发出 b,30ms 报错 cold('(abc|)') // → 同步发出 a、b、c,然后立即完成 // 热 Observable:从 ^ 标记处开始接收 hot('--a--b--c--|', { a: 1, b: 2, c: 3 }) // → ^ 之前的历史值对新订阅者不可见 ``` ## TestScheduler 基本用法 ### 初始化 TestScheduler ```typescript import { TestScheduler } from 'rxjs/testing'; let testScheduler: TestScheduler; beforeEach(() => { testScheduler = new TestScheduler((actual, expected) => { // 深比较实际输出与期望输出 expect(actual).toEqual(expected); }); }); ``` > 关键点:`run()` 回调内提供的 `cold`、`hot`、`expectObservable`、`expectSubscriptions` 是测试的四大工具,不要在 `run()` 外部使用它们。 ### 测试 map 操作符 ```typescript it('应将每个值转为大写', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-|', { a: 'hello', b: 'world', c: 'rxjs' }); const expected = '-A-B-C-|'; const result$ = source$.pipe(map(x => x.toUpperCase())); expectObservable(result$).toBe(expected, { A: 'HELLO', B: 'WORLD', C: 'RXJS' }); }); }); ``` > 为什么要传值映射?当 marble 字符串中的字母与实际值不同时,必须通过第二个参数映射,否则默认值就是字母本身。 ## 面试高频:时间类操作符测试 时间相关操作符是 Marble Testing 最核心的应用场景,因为传统方式很难精确控制时间。 ### delay ```typescript it('应延迟 30ms 发出值', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-|'); const expected = '---a-b-|'; const result$ = source$.pipe(delay(30, testScheduler)); expectObservable(result$).toBe(expected); }); }); ``` ### debounceTime ```typescript it('应在 20ms 无新值后才发出', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a--b--c---|'); const expected = '-----b--c---|'; const result$ = source$.pipe(debounceTime(20, testScheduler)); expectObservable(result$).toBe(expected); }); }); ``` ### throttleTime ```typescript it('应每 30ms 最多发出一个值', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-ab-cde-f-|'); const expected = '-a---d--f-|'; const result$ = source$.pipe(throttleTime(30, testScheduler)); expectObservable(result$).toBe(expected); }); }); ``` > 面试追问:debounceTime 和 throttleTime 的区别?前者等"安静期"再发,后者等"冷却期"再放行——两者在 marble 图上表现为截然不同的输出模式。 ## 组合操作符的测试 ### merge:交错合并 ```typescript it('应交错合并两个流', () => { testScheduler.run(({ cold, expectObservable }) => { const source1$ = cold('-a---b-|'); const source2$ = cold('--c-d---|'); const expected = '-a-c-b-d-|'; const result$ = merge(source1$, source2$); expectObservable(result$).toBe(expected); }); }); ``` ### concat:顺序拼接 ```typescript it('应顺序拼接两个流', () => { testScheduler.run(({ cold, expectObservable }) => { const source1$ = cold('-a-b-|'); const source2$ = cold('--c-d-|'); const expected = '-a-b--c-d-|'; const result$ = concat(source1$, source2$); expectObservable(result$).toBe(expected); }); }); ``` ### combineLatest:取最新组合 ```typescript it('应在任一流发出时组合最新值', () => { testScheduler.run(({ cold, expectObservable }) => { const source1$ = cold('-a---b-|', { a: 1, b: 2 }); const source2$ = cold('--c-d---|', { c: 10, d: 20 }); const expected = '----xy-z|'; const result$ = combineLatest([source1$, source2$]); expectObservable(result$).toBe(expected, { x: [1, 20], y: [2, 20], z: [2, 20] }); }); }); ``` > 面试追问:combineLatest 为什么第一个输出是 `[1, 20]` 而不是 `[1, 10]`?因为 combineLatest 要求每个源至少发出一次后才开始组合——source1$ 发出 a=1 时 source2$ 还没发出过值,直到 source2$ 发出 d=20 时两个流才都有值,此时组合的是 source1$ 的最新值 1 和 source2$ 的最新值 20。 ## 错误处理测试 ### catchError ```typescript it('应捕获错误并返回替代值', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-#'); const expected = '-a-b-(d|)'; const result$ = source$.pipe( catchError(() => of('d')) ); expectObservable(result$).toBe(expected); }); }); ``` ### retry ```typescript it('应在出错时重试一次', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-#'); const expected = '-a-a-#'; const result$ = source$.pipe(retry(1)); expectObservable(result$).toBe(expected); }); }); ``` > 注意 `(d|)` 的括号:catchError 返回的 of('d') 是同步发出再完成的,在 marble 中必须用括号分组。 ## 订阅与取消订阅验证 `expectSubscriptions` 专门验证 Observable 何时被订阅、何时被取消订阅,这是面试中区分"会用"和"理解原理"的分水岭。 ```typescript it('应在 take(2) 后自动取消订阅', () => { testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => { const source$ = cold('-a-b-c-d-|'); const sub = '^---!'; const result$ = source$.pipe(take(2)); expectObservable(result$).toBe('-a-b-|'); expectSubscriptions(source$.subscriptions).toBe(sub); }); }); ``` > `^---!` 表示:订阅开始(^),经历 3 帧(---),取消订阅(!)。这验证了 take(2) 在收到第二个值后确实取消了上游订阅。 ## 实战场景 ### 搜索防抖 ```typescript it('应对输入做防抖后发起搜索', () => { testScheduler.run(({ cold, expectObservable }) => { const input$ = cold('-a--b---c-|'); const expected = '-----b---c-|'; const result$ = input$.pipe( debounceTime(20, testScheduler), distinctUntilChanged(), switchMap(q => search(q)) ); expectObservable(result$).toBe(expected); }); }); ``` ### 轮询与停止 ```typescript it('应按间隔轮询并在获取足够数据后停止', () => { testScheduler.run(({ expectObservable }) => { const expected = '-a-b-c-d-e-|'; const result$ = interval(10, testScheduler).pipe( take(5), map(i => String.fromCharCode(97 + i)) ); expectObservable(result$).toBe(expected); }); }); ``` ## Hot Observable 的测试 Hot Observable 在订阅前就已经开始发出值,测试时用 `^` 标记订阅起点,之后才能收到值。 ```typescript it('应只接收订阅后的值', () => { testScheduler.run(({ hot, expectObservable }) => { const source$ = hot('--a--b--c--|'); const sub = '---^-------!'; const expected = '--b--c--|'; const result$ = source$.pipe(take(2)); expectObservable(result$, sub).toBe(expected); }); }); ``` > 面试追问:cold 和 hot 的本质区别?cold 每次订阅都重新开始,数据对每个订阅者独立;hot 共享同一个数据源,新订阅者只能收到订阅后的值。 ## 常见陷阱 **1. marble 字符串长度必须对齐** ```typescript // 错误:长度不一致 const source$ = cold('-a-b-|'); const expected = '-A-B-C-|'; // 多了 C,长度不匹配 // 正确:每个位置一一对应 const source$ = cold('-a-b-|'); const expected = '-A-B-|'; ``` **2. 不要忘记传 TestScheduler** ```typescript // 错误:使用了真实的 setTimeout source$.pipe(debounceTime(100)); // 正确:传入 testScheduler 使用虚拟时间 source$.pipe(debounceTime(100, testScheduler)); ``` **3. run() 内部不要使用真实异步** ```typescript // 错误:run() 内用了 setInterval testScheduler.run(() => { setInterval(() => {}, 100); // 会干扰虚拟时间 }); ``` **4. 值映射与默认值** 当 marble 字母就是你想表达的值时,可以省略映射对象;但当字母与实际值不同(如 `a` 代表 `1`),必须显式传入。
前端5月28日 08:23
RxJS 中 Hot Observable 和 Cold Observable 有什么区别?## 先搞清楚一个核心:数据生产者在哪 Cold 和 Hot 的本质区别只有一个:**数据生产者(Producer)是在订阅时创建,还是在 Observable 创建时就已经存在?** - **Cold Observable**:生产者在订阅时才创建,每个订阅者拿到独立的生产者,这就是"单播"(Unicast) - **Hot Observable**:生产者在 Observable 创建时就已经存在,所有订阅者共享同一个生产者,这就是"多播"(Multicast) 理解了这一点,后面所有特性都能推导出来,不需要死记硬背。 ## Cold Observable:按需执行,人手一份 Cold Observable 是"惰性"的——没有人订阅,它什么都不做。每次有新订阅者,它都会从头执行一遍逻辑,产生一份独立的数据流。 ```javascript import { Observable } from 'rxjs'; const cold$ = new Observable(subscriber => { console.log('执行逻辑'); subscriber.next(Math.random()); subscriber.complete(); }); cold$.subscribe(v => console.log('订阅者A:', v)); // 执行逻辑 → 订阅者A: 0.314159 cold$.subscribe(v => console.log('订阅者B:', v)); // 执行逻辑 → 订阅者B: 0.271828 // 两次订阅各执行一次,随机值不同——因为每个订阅者有独立的生产者 ``` 用一个生活类比:Cold Observable 像**电影院的电影文件**——每个观众点播时,影院单独为他播放一份,各看各的进度,互不影响。 **常见 Cold 操作符**:`of()`、`from()`、`interval()`、`timer()`、`ajax()`、Angular 的 `HttpClient.get()` ## Hot Observable:共享数据流,先到先得 Hot Observable 是"主动"的——不管有没有人订阅,生产者都在运作。新订阅者只能收到订阅之后的数据,之前发过的就错过了。 ```javascript import { Subject } from 'rxjs'; const subject = new Subject(); subject.subscribe(v => console.log('订阅者A:', v)); subject.next(1); // 订阅者A: 1 subject.subscribe(v => console.log('订阅者B:', v)); subject.next(2); // 订阅者A: 2, 订阅者B: 2 // 订阅者B 没收到 1,因为订阅晚了 ``` 类比:Hot Observable 像**电视直播**——频道一直在播,你打开电视只能看到当前和后续的节目,之前的已经播完了回不来。 **常见 Hot 来源**:`Subject` 及其变体(`BehaviorSubject`、`ReplaySubject`、`AsyncSubject`)、`fromEvent()` 绑定的 DOM 事件、WebSocket 连接 ## 单播 vs 多播:从源码角度理解 Cold Observable 的 `subscribe` 函数里直接创建生产者: ```javascript // Cold:每次 subscribe 都执行这个函数,各订阅者独立 const cold$ = new Observable(subscriber => { const source = createProducer(); // 每个订阅者创建自己的生产者 source.onData(data => subscriber.next(data)); }); ``` Hot Observable 的生产者在外部,`subscribe` 只是注册监听: ```javascript // Hot:生产者已存在,subscribe 只是往里注册回调 const hot$ = new Observable(subscriber => { externalSource.addListener(data => subscriber.next(data)); // 所有订阅者监听同一个 externalSource }); ``` 所以 Cold → Hot 的本质就是**把内部生产者提到外部,让多个订阅者共享**。 ## Cold 转 Hot 的三种方式 ### share()——最常用 `share()` 内部使用 `Subject` 实现多播,并且带 `refCount` 机制:当订阅者数从 1 降到 0 时自动断开上游,再有新订阅者时重新连接。 ```javascript import { interval } from 'rxjs'; import { share, take } from 'rxjs/operators'; const source$ = interval(1000).pipe(take(5)); const shared$ = source$.pipe(share()); shared$.subscribe(v => console.log('A:', v)); setTimeout(() => shared$.subscribe(v => console.log('B:', v)), 2000); // A 和 B 共享同一个 interval 计时器 // B 在第2秒加入,只能收到 2、3、4 ``` ### shareReplay(n)——缓存最近 n 个值 `shareReplay` 在 `share` 的基础上缓存最近的 n 个值,新订阅者能立即收到缓存数据,解决"来晚了错过数据"的问题。 ```javascript import { interval } from 'rxjs'; import { shareReplay, take } from 'rxjs/operators'; const source$ = interval(1000).pipe(take(5), shareReplay(1)); source$.subscribe(v => console.log('A:', v)); setTimeout(() => { source$.subscribe(v => console.log('B:', v)); // B 立即收到缓存的最新的一个值,然后继续接收后续值 }, 3000); ``` **关键区别**:`share()` 的 refCount 在订阅者归零后断开上游,而 `shareReplay()` 默认不会断开(可通过 `config.resetOnComplete` 等参数调整)。 ### publish() + connect()——手动控制 `publish()` 把 Cold Observable 变成 `ConnectableObservable`,必须手动调用 `connect()` 才开始执行。适合需要先注册所有订阅者再启动数据流的场景。 ```javascript import { interval } from 'rxjs'; import { publish, take } from 'rxjs/operators'; const source$ = interval(1000).pipe(take(5), publish()); source$.subscribe(v => console.log('A:', v)); source$.subscribe(v => console.log('B:', v)); // 此时不执行,等所有订阅者就绪 source$.connect(); // 手动启动 ``` ## 实际开发中的选择 ### 用 Cold 的场景 - **HTTP 请求**:每个组件独立获取数据,互不干扰 - **独立计算**:每个订阅者需要各自的处理结果 - **可重复执行**:每次订阅都希望从头获取完整数据 ### 用 Hot 的场景 - **共享 HTTP 结果**:多个组件需要同一接口的数据,用 `shareReplay(1)` 避免重复请求 - **事件监听**:DOM 事件、WebSocket 消息天然就是多播的 - **状态管理**:`BehaviorSubject` 持有最新状态,新订阅者立即获取当前值 ## 最容易踩的坑 ### 坑1:忘记共享导致重复请求 ```javascript // 每次订阅都发新请求——大忌 const data$ = http.get('/api/data'); data$.subscribe(handle1); data$.subscribe(handle2); // 又发了一次请求 // 用 shareReplay 共享 const data$ = http.get('/api/data').pipe(shareReplay(1)); data$.subscribe(handle1); data$.subscribe(handle2); // 只发一次请求 ``` ### 坑2:share() 的 refCount 陷阱 ```javascript const source$ = interval(1000).pipe(share()); const sub1 = source$.subscribe(v => console.log('A:', v)); const sub2 = source$.subscribe(v => console.log('B:', v)); sub1.unsubscribe(); sub2.unsubscribe(); // 所有订阅者都取消后,上游停止 source$.subscribe(v => console.log('C:', v)); // 重新订阅,上游重新连接,C 从0开始收数据 // 如果这里用 shareReplay(1),行为可能不同 ``` ### 坑3:shareReplay 缓存过多 ```javascript // 缓存1000个值,内存会爆 interval(1000).pipe(shareReplay(1000)); // 通常缓存1个就够了 interval(1000).pipe(shareReplay(1)); ``` ## 一张表总结 | 特性 | Cold Observable | Hot Observable | |------|----------------|----------------| | 生产者创建时机 | 订阅时 | Observable 创建时 | | 数据流 | 每个订阅者独立 | 所有订阅者共享 | | 传播方式 | 单播(Unicast) | 多播(Multicast) | | 错过数据 | 不会,每次从头 | 会,只能收订阅后的数据 | | 典型代表 | `of`、`from`、HTTP | `Subject`、DOM 事件 | | 转 Hot | `share()`、`shareReplay()` | 不可转 Cold | 记住核心判断:**看生产者——订阅时创建就是 Cold,早就存在就是 Hot。** 面试中如果能从生产者角度解释单播/多播的区别,再提到 `share` 的 refCount 机制和 `shareReplay` 的缓存策略,基本就能拿到高分。
前端5月27日 21:47
RxJS 中 switchMap、mergeMap、concatMap 有什么区别?常用操作符有哪些?## 答案:六类核心操作符 面试中最常考的 RxJS 操作符分六类: **创建类**:`of`、`from`、`interval`、`timer` — 把同步值、数组、Promise、定时器转为 Observable。 **转换类**:`map` 逐值转换;`switchMap` 切换到新 Observable 并**取消前一个**(搜索框场景首选);`mergeMap` 并行处理所有内部 Observable(不取消,不保证顺序);`concatMap` 顺序处理,上一个完成才订阅下一个。 **过滤类**:`filter` 条件过滤;`take(n)` 只取前 n 个;`takeUntil(notifier)` notifier 发出时终止;`debounceTime` 停顿后才发出(防抖);`throttleTime` 间隔内只取第一个(节流)。 **组合类**:`merge` 并行合并;`concat` 顺序连接;`combineLatest` 任一更新都组合最新值;`zip` 按索引一一配对。 **错误处理类**:`catchError` 捕获并返回替代 Observable;`retry(n)` 失败后自动重试 n 次。 **工具类**:`tap` 执行副作用不改值(调试利器);`delay` 延迟发射;`distinctUntilChanged` 连续重复值只发一次。 ### switchMap vs mergeMap vs concatMap:面试最高频对比 三者的核心区别在于**对前一个内部 Observable 的处理策略**: - **switchMap**:新值到来 → 取消上一个 → 订阅新的。搜索框、路由切换等竞态场景必选,保证只拿到最新结果。 - **mergeMap**:全部并行订阅,互不干扰。适合批量请求、日志上报等不关心顺序的场景。可传第二参数控制并发数。 - **concatMap**:排队串行,上一个完成才订阅下一个。适合顺序敏感的写操作,如依次保存表单步骤。 ```javascript // switchMap:搜索框 — 自动取消旧请求,避免竞态 fromEvent(input, 'input').pipe( debounceTime(300), switchMap(query => http.get('/search', { query })) ) // mergeMap:批量删除 — 限制3并发 ids.pipe(mergeMap(id => http.delete(`/item/${id}`), 3)) // concatMap:串行保存 — 保证顺序 actions.pipe(concatMap(action => http.post('/save', action))) ``` 选错操作符的后果:搜索场景用 mergeMap 会显示旧请求结果覆盖新结果;表单保存用 mergeMap 可能后发先至导致数据错乱。 ### 高频实战组合模式 ```javascript // 搜索防抖 + 去重 + 空值过滤 + 自动取消旧请求 searchInput.pipe( debounceTime(300), distinctUntilChanged(), switchMap(query => query.length > 0 ? searchAPI(query) : EMPTY) ) // 表单自动保存(去重 + 防抖) form.valueChanges.pipe( debounceTime(1000), distinctUntilChanged(), switchMap(data => saveAPI(data)) ) ``` ### 追问方向 - `combineLatest` vs `zip` 的区别?— combineLatest 任一更新即触发输出,zip 等所有 Observable 都发了新值才按索引配对 - 如何避免内存泄漏?— `takeUntil(destroy$\)` 模式,组件销毁时 `destroy$.next()` 统一取消,优于手动 unsubscribe - `shareReplay(1)` 解决什么问题?— 多订阅者共享同一数据源并缓存最新值,避免重复 HTTP 请求 - `exhaustMap` 是什么?— 忽略新值直到当前内部 Observable 完成,适合防止重复提交(登录按钮连击) - `forkJoin` vs `combineLatest`?— forkJoin 等所有完成后只取各自最后一个值,类似 Promise.all;combineLatest 每次变化都触发
前端5月27日 21:45
RxJS 中的 Observable 和 Promise 有什么区别?## 核心答案 Observable 和 Promise 都是处理异步操作的工具,关键区别在四个字:**单值/多值、惰性/急切、可取消/不可取消、单播/多播**。 | 维度 | Promise | Observable | |------|---------|------------| | 值的数量 | 只能 resolve 一次 | 可以 next 多次 | | 执行时机 | 创建即执行(eager) | 订阅才执行(lazy) | | 取消 | 无法取消 | unsubscribe() 取消 | | 多播 | 同一 Promise 多次 then 共享同一结果 | 默认冷 Observable,每次订阅独立执行 | | 错误重试 | 需手动包装 | retry 操作符一行搞定 | | 操作符 | then/catch/finally | map、filter、switchMap 等上百个 | ## 逐条展开 **1. 单值 vs 多值** Promise 只能 resolve 一次,后续调用被忽略;Observable 通过 next 可以持续推送数据。 ```javascript // Promise — 只拿到 1 const p = new Promise(resolve => { resolve(1); resolve(2); // 无效 }); // Observable — 拿到 1, 2, 3 const obs$ = new Observable(subscriber => { subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); }); ``` **2. 惰性 vs 急切** Promise 构造函数里的代码在 new 时就跑了;Observable 的回调要等 subscribe 才执行。这意味着不订阅的 Observable 什么也不做,省资源。 **3. 可取消** 这是面试高频追问点。Promise 一旦创建就跑到完,想中途撤销只能用第三方库包装。Observable 原生支持: ```javascript const sub = interval(1000).subscribe(x => console.log(x)); setTimeout(() => sub.unsubscribe(), 3000); // 3秒后停止 ``` **4. 冷 vs 热(单播 vs 多播)** 冷 Observable 每次订阅都重新执行一遍;热 Observable(如 Subject)多个订阅者共享同一数据源。Promise 天然是"热"的——创建后结果确定,所有 then 拿到同一个值。 ```javascript // 冷 Observable — 两个订阅各跑一次 const cold$ = of(Math.random()); cold$.subscribe(v => console.log(v)); // 0.123 cold$.subscribe(v => console.log(v)); // 0.456(不同值) // 用 share() 变热 const hot$ = cold$.pipe(share()); ``` **5. 错误处理与重试** Promise 出错只能 catch 一次,重试要自己写循环。Observable 有 retry、retryWhen 等操作符,一行搞定指数退避重试。 ```javascript // Promise — 手动重试 function retryFetch(url, n) { return fetch(url).catch(err => n > 0 ? retryFetch(url, n - 1) : Promise.reject(err)); } // Observable — 一行重试 http.get(url).pipe(retry(3)); ``` **6. 操作符生态** Promise 只有 then 链,复杂异步编排(竞速、合并、去抖)要手写。Observable 有 switchMap、mergeMap、debounceTime、combineLatest 等操作符,声明式描述数据流。 ## 面试追问方向 - **什么时候用 Promise 就够了?** 单次异步请求、不需要取消、不需要流式处理——Promise 更简单。 - **Observable 能完全替代 Promise 吗?** 理论上可以(from(promise) 转换),但简单场景用 Observable 是过度设计。 - **Angular 为什么选 Observable?** HTTP 请求可能需取消(路由离开),表单值变化是流,组件生命周期也是流——Observable 统一了这些模型。
前端5月27日 21:42
如何在 RxJS 中防止内存泄漏?## 核心答案 RxJS 内存泄漏的根因是**订阅了 Observable 却未取消订阅**,导致回调闭包持有外部引用,阻止垃圾回收。防止泄漏的关键就一条:**确保每个订阅都有取消的时机**。 最推荐的方式是 `takeUntil` 模式: ```typescript private destroy$ = new Subject<void>(); ngOnInit() { this.http.get('/api/data').pipe( takeUntil(this.destroy$) ).subscribe(data => this.data = data); } ngOnDestroy() { this.destroy$.next(); this.destroy$.complete(); } ``` 组件销毁时 `destroy$` 发出通知,所有通过 `takeUntil` 管道的订阅自动完成,闭包释放,GC 可回收相关内存。 ## 哪些 Observable 必须取消订阅 判断标准:**不会自动 complete 的流必须手动取消**。 - `interval`、`timer` —— 持续发射,永不完成 - `fromEvent` —— 事件监听,永不完成 - `Subject` / `BehaviorSubject` —— 需手动调 `complete()` - Angular 的 `params`、`valueChanges` —— 持续流 HTTP 请求 `httpClient.get()` 发射一次后自动 complete,理论上不必取消,但用 `takeUntil` 也不亏——能顺便中断请求。 ## 三种取消策略对比 **1. `takeUntil`** —— 最推荐,声明式,一个 `destroy$` 管所有订阅 **2. `Subscription` 聚合** —— `new Subscription()` + `.add()`,批量 `unsubscribe()`,适合非组件场景 **3. `take(1)` / `first()`** —— 只需首个值时用,取完自动 complete,注意如果流不发射也不会自动取消 ## 容易踩的坑 **嵌套订阅**:外层订阅的回调里再 subscribe,内层订阅完全失控。用 `switchMap` 替代——它自动取消前一次内部订阅: ```typescript // 嵌套订阅,内层泄漏 this.http.get('/user').subscribe(user => { this.http.get(`/posts/${user.id}`).subscribe(...); }); // switchMap 自动管理 this.http.get('/user').pipe( switchMap(user => this.http.get(`/posts/${user.id}`)) ).subscribe(posts => ...); ``` **闭包引用大对象**:订阅回调捕获外部变量,即使该变量不再使用,只要订阅存活就无法回收。取消订阅即释放闭包引用。 **服务中的订阅**:Service 生命周期等于应用生命周期,在里面 `.subscribe()` 几乎不可能取消。正确做法是返回 Observable,让调用方决定何时订阅和取消。 ## 追问方向 - `takeUntil` 和 `unsubscribe()` 有什么区别?——前者在管道中完成流,后者是外部强制中断;`takeUntil` 更符合声明式风格 - `switchMap`、`concatMap`、`mergeMap` 哪个能防止内存泄漏?——`switchMap` 自动取消前一次,其余不会;多对多场景需配合 `takeUntil` - 如何检测 RxJS 内存泄漏?——Chrome DevTools Memory 面板拍快照,对比组件销毁前后 retained size;或在 `ngOnDestroy` 打日志确认是否执行
前端5月27日 21:29
RxJS 中如何处理背压(Backpressure)问题?## 什么是背压 RxJS 中 Observable 默认是推模型——生产者决定发射节奏。当数据产生速度超过消费者处理速度时,未处理的值会堆积在内存中,这就是背压问题。典型场景包括:高频 DOM 事件、WebSocket 消息流、快速轮询的传感器数据。 RxJS 不像 RxJava 有 Flowable 这种原生背压类型,它的策略是"丢、缓、换"三种思路。 ## 丢:节流与采样 不需要每一条数据时,主动丢弃多余值: - **throttleTime**:时间窗口内只取第一个,适合 scroll/resize 等高频事件 - **debounceTime**:等输入静止后再发射,适合搜索框联想 - **sampleTime**:周期性取最新值,中间值全部丢弃 - **auditTime**:周期性取窗口末尾值,与 sample 语义不同 ```typescript // 滚动事件:200ms 内只处理一次 fromEvent(window, 'scroll').pipe( throttleTime(200) ) // 搜索框:停止输入 300ms 后请求 fromEvent(input, 'input').pipe( debounceTime(300), switchMap(e => search(e.target.value)) ) ``` 关键区别:throttle 保证有规律地采样,debounce 保证只在"安静"后触发,面试中经常要求区分二者。 ## 缓:批量打包 需要全部数据但可以延迟处理时,把值攒起来一起发: - **bufferTime / bufferCount**:按时间或数量打包成数组 - **windowTime / windowCount**:类似 buffer,但输出内层 Observable 而非数组 ```typescript // 每 500ms 打包一次传感器数据 sensor$.pipe(bufferTime(500)).subscribe(batch => { processBatch(batch); // 一次处理多条 }); ``` buffer 的风险是缓冲区无限增长,生产速度持续超过消费速度时仍然会内存溢出。实际项目中建议配合 take 或设置上限。 ## 换:切换并发模型 改变数据消费方式来匹配生产速度: - **concatMap**:完全串行,一个结束再发下一个,最安全但最慢 - **mergeMap(n)**:限制并发数为 n,兼顾吞吐和资源 - **switchMap**:只保留最新请求,自动取消前一个 ```typescript // 并发限制为 3 的批量请求 from(ids).pipe( mergeMap(id => fetchUser(id), 3) ) // 搜索场景:自动取消旧请求 input$.pipe( debounceTime(300), switchMap(query => searchAPI(query)) ) ``` 面试高频追问:mergeMap、concatMap、switchMap、exhaustMap 四者的区别。核心区分点在于——新值到来时,是排队(concat)、并行(merge)、取消旧的(switch)还是忽略新的(exhaust)。 ## 实际选型思路 面试中更看重你能否根据场景选对策略,而不是背诵 API: | 场景 | 推荐策略 | 理由 | |---|---|---| | 滚动/resize 事件 | throttleTime | 只需采样,丢数据无害 | | 搜索输入联想 | debounceTime + switchMap | 避免无效请求,自动取消旧请求 | | 批量 API 请求 | mergeMap(n) | 控制并发,兼顾效率 | | 有序上传文件 | concatMap | 顺序保证,避免竞争 | | 高频传感器数据 | bufferTime | 批量处理更高效 | RxJS 没有真正的拉取式背压,本质上是"有损控制"。如果业务要求零丢数据,需要考虑在架构层面引入队列或换用支持 Flowable 的方案。
前端5月27日 21:26
RxJS 中如何创建自定义操作符?## 核心答案 RxJS 创建自定义操作符有三种方式,优先使用**组合现有操作符**,其次用 **pipeable 函数**,避免直接 new Observable: ```typescript // 1. 组合现有操作符(推荐) function debounceSearch(ms = 300) { return pipe( debounceTime(ms), distinctUntilChanged() ); } // 2. pipeable 函数 function debug<T>(label: string): OperatorFunction<T, T> { return source$ => source$.pipe( tap(v => console.log(label, v)) ); } ``` ## 三种方式的区别 | 方式 | 适用场景 | 复杂度 | |------|---------|--------| | 组合现有操作符 | 逻辑由已有操作符拼装即可 | 低 | | pipeable 函数(返回 `source$ => new Observable`) | 需要精细控制订阅和退订 | 中 | | 直接 new Observable | 极端定制场景,需手动管理全部生命周期 | 高 | 直接 new Observable 需要自己处理 next/error/complete 和 teardown,容易遗漏导致内存泄漏,非必要不使用。 ## 追问:操作符内如何保证资源释放? 返回的 Observable 订阅时必须返回 teardown 函数: ```typescript function withTimeout<T>(ms: number): OperatorFunction<T, T> { return source$ => new Observable(subscriber => { const timer = setTimeout(() => subscriber.error(new Error('timeout')), ms); const sub = source$.subscribe({ next: v => subscriber.next(v), error: e => subscriber.error(e), complete: () => subscriber.complete() }); // teardown:取消定时器 + 退订上游 return () => { clearTimeout(timer); sub.unsubscribe(); }; }); } ``` 忘记返回 teardown 是自定义操作符最常见 bug,会导致定时器、子订阅等资源无法回收。 ## 追问:RxJS 7+ 写法有什么变化? `retryWhen`、`tap(next, error, complete)` 三参数重载等已在 RxJS 7 中弃用或移除。自定义操作符应使用 `retry({ count, delay })` 等新 API,组合操作符时优先用 `pipe()` 而非链式调用。
前端5月27日 21:26
RxJS 中如何处理错误?有哪些错误处理操作符?## 核心答案 RxJS 中 Observable 一旦出错,整个流就会终止。常用的错误处理操作符有五个: - **catchError** — 捕获错误,返回替代 Observable,流继续 - **retry(n)** — 出错后重新订阅,最多重试 n 次 - **retryWhen** — 自定义重试策略(延迟、指数退避等) - **finalize** — 流结束(无论成功还是出错)时执行清理 - **onErrorResumeNext** — 出错后跳到下一个 Observable 继续 其中 catchError 是面试最常考的,关键在于理解它的位置决定行为:放在 `mergeMap` 内部只捕获单条流错误,放在外部则整个流被替换。 ## catchError 的位置陷阱 ```typescript // 错误写法:外层 catchError,一条失败整条流终止 source$.pipe( mergeMap(id => fetchData(id)), catchError(() => of(fallback)) // 任一请求失败,后续全部跳过 ); // 正确写法:内层 catchError,单条失败不影响其他 source$.pipe( mergeMap(id => fetchData(id).pipe( catchError(() => of(fallback)) // 只替换这一条 )) ); ``` 面试中经常追问这个区别:内层捕获让每条数据流独立容错,外层捕获则是兜底策略。 ## retry 与 retryWhen 怎么选 `retry(3)` 简单粗暴,立刻重试三次。实际项目中更常见带延迟的重试: ```typescript source$.pipe( retryWhen(errors => errors.pipe( scan((count, err) => { if (count >= 3) throw err; return count + 1; }, 0), delayWhen(count => timer(Math.pow(2, count) * 1000)) ) ) ); ``` 指数退避重试是生产环境的标准做法,面试能答出这个基本过关。注意 RxJS 7 之后 `retryWhen` 已废弃,推荐用 `retry({ delay: ... })` 替代。 ## finalize 不是 finally `finalize` 无论成功、出错还是取消订阅都会执行,适合释放资源(关闭连接、清除定时器等)。它不接收参数,拿不到错误信息,别跟 Promise 的 finally 混淆。 ## 追问方向 - catchError 返回 throwError 会怎样?——错误继续向上传播 - retry 的重试是重新订阅还是重新执行?——重新订阅整个上游 - Observable 出错后订阅者还能收到值吗?——不能,流已终止
前端5月27日 21:25
RxJS 中的调度器(Scheduler)是什么?如何使用?## 调度器是什么 调度器决定 RxJS 中通知(next/error/complete)的执行时机和上下文。简单说:**它控制一段 Observable 逻辑"在什么时候跑"——同步、微任务、宏任务还是动画帧。** RxJS 内置 5 种调度器,核心区别如下: | 调度器 | 底层机制 | 典型场景 | |--------|----------|----------| | null(同步) | 直接递归调用 | 默认,少量数据 | | queueScheduler | 同步蹦床调度 | 递归/迭代,防栈溢出 | | asapScheduler | Promise.then(微任务) | 优先级高于宏任务的异步 | | asyncScheduler | setInterval(宏任务) | 延迟、定时操作 | | animationFrameScheduler | requestAnimationFrame | 浏览器动画 | ## 怎么指定调度器 三种方式: **1. 创建时传入**——作为操作符最后一个参数: ```typescript of(1, 2, 3, asyncScheduler).subscribe(console.log); ``` **2. observeOn / subscribeOn**: - `observeOn(asyncScheduler)`:下游通知切到异步调度 - `subscribeOn(asyncScheduler)`:上游订阅动作切到异步调度 二者区别常被追问:`subscribeOn` 影响的是"什么时候开始执行 Observable 逻辑",`observeOn` 影响的是"下游在哪个调度器上接收通知"。 **3. schedule 方法**——手动调度: ```typescript asyncScheduler.schedule(() => console.log('1s后'), 1000); ``` ## 面试高频追问 **asapScheduler 和 asyncScheduler 有什么区别?** asapScheduler 走微任务队列(Promise.then),asyncScheduler 走宏任务队列(setTimeout/setInterval)。微任务优先级更高,会在当前宏任务结束后、下一个宏任务之前执行。需要尽快响应但不阻塞主线程时选 asap,需要真正延迟时选 async。 **queueScheduler 和同步调度器有什么区别?** 同步调度器直接递归调用,大量数据可能栈溢出。queueScheduler 用蹦床调度(trampoline),把递归展开为循环迭代,避免栈溢出同时保持同步语义。 **什么时候该手动指定调度器?** 大多数时候不需要——RxJS 按最小并发原则自动选择默认调度器。手动指定主要出现在三个场景:测试中用 TestScheduler 控制时间、动画中用 animationFrameScheduler 同步渲染帧、需要改变默认执行上下文时。 **如何在测试中控制调度器?** 使用 `TestScheduler`,它提供虚拟时钟,可以用 marble diagram 声明时序并同步断言结果,避免真实等待。 ## 选择建议 不需要调度器就不加。需要延迟选 async,需要非阻塞优先级选 asap,递归防溢出选 queue,动画选 animationFrame。过度使用调度器只会增加复杂度和性能开销。