前端5月30日 21:21
RxJS 中 Subject、BehaviorSubject、ReplaySubject 和 AsyncSubject 怎么选?Subject 是既能订阅又能 next 的 Observable,常用来把外部事件推给多个订阅者。Subject 不保存历史值;BehaviorSubject 保存当前值,新订阅者立刻拿到;ReplaySubject 可以回放一段历史;AsyncSubject 只在 complete 时发出最后一个值。选型时先问:新订阅者要不要旧值,要几个旧值,结果是不是只有完成后才有意义。
## 追问
### BehaviorSubject 为什么适合状态?
状态通常需要当前值,比如用户信息、主题配置、表单快照。它要求初始值,所以你必须明确空状态是什么。
### ReplaySubject 有什么边界?
ReplaySubject 可以保留多个值或一段时间窗口。如果不设置 bufferSize 或 windowTime,历史值可能越攒越多。
### AsyncSubject 现在还常用吗?
比较少,因为很多只取最终结果的场景已被 Promise 或 lastValueFrom 覆盖。它适合只关心完成后最后值的过程。
### Subject 当事件总线有什么坑?
全局事件总线会让数据流来源变模糊。更稳的做法是封装 Subject,只暴露 asObservable。
## 写段代码
```javascript
const currentUser$ = new BehaviorSubject(null);
currentUser$.next(user);
currentUser$.subscribe(render);
```标签
Rxjs
一个 JavaScript 库,使用可观察量进行反应式编程,处理异步数据调用、回调和基于事件的程序。

前端5月30日 21:21
RxJS 中 switchMap、mergeMap、concatMap 该怎么选?switchMap、mergeMap、concatMap 都是把外层值映射成内部 Observable,再把结果摊平,但它们处理“新任务来了,旧任务怎么办”的策略不同。switchMap 会取消旧任务,只要最新结果;mergeMap 会并发执行,所有结果都要;concatMap 会排队,一个完成后再做下一个。
## 追问
### 为什么搜索框通常用 switchMap?
用户连续输入时,旧关键词请求已经没有展示价值。switchMap 会退订上一次内部流,避免慢请求晚返回覆盖新结果。
### mergeMap 有什么风险?
mergeMap 默认不限制并发,外层值很多时会同时打出大量请求。实际项目常用第二个参数限制并发数。
### concatMap 为什么慢但常用?
它故意排队,前一个内部 Observable 不 complete,后一个不会开始。适合顺序保存、支付步骤、分片上传。
### 和 exhaustMap 怎么区分?
exhaustMap 忙的时候忽略新任务,适合防重复提交。switchMap 抛弃旧的保留新的,exhaustMap 保留旧的忽略新的。
## 写段代码
```javascript
searchText$.pipe(
debounceTime(300), distinctUntilChanged(),
switchMap(q => apiSearch(q).pipe(catchError(() => of([]))))
).subscribe(renderList);
```前端5月30日 21:21
RxJS 6 升级到 RxJS 7 时要注意哪些变化?RxJS 7 不是把 RxJS 6 推倒重来,而是在保持 pipe 操作符模型的基础上,重点修了类型、弃用 API、Promise 转换和多播写法。项目从 6 升到 7,通常不用大面积重写业务流,但要重点检查 throwError、toPromise、combineLatest/concat/merge 的静态调用、shareReplay 配置,以及 TypeScript 版本和严格类型报错。
## 追问
### 最大的破坏性变化是什么?
toPromise 被废弃,应该改成 firstValueFrom 或 lastValueFrom。前者拿第一个值后退订,后者等流 complete 后取最后一个值。
### throwError 为什么改成工厂函数?
RxJS 7 推荐 throwError(() => error),错误对象会在订阅时创建,堆栈更准确。
### 导入路径需要怎么改?
多数项目仍从 rxjs 和 rxjs/operators 导入。更重要的是清理 rxjs/internal/* 深层导入。
### shareReplay 有什么坑?
缓存 HTTP 时常写 shareReplay(1),但要考虑 refCount 和重置策略,否则长期服务可能一直持有缓存。
## 写段代码
```javascript
const source$ = throwError(() => new Error('request failed'));
const value = await firstValueFrom(apiResult$);
```服务端5月30日 21:21
RxJS 在 Angular 项目中通常怎么用?在 Angular 里,RxJS 主要用来处理会随时间变化的数据:HTTP 请求、路由参数、响应式表单、组件事件和应用状态。HttpClient 返回 Observable,路由的 paramMap、表单的 valueChanges 也是 Observable,所以 Angular 项目不是额外引入 RxJS,而是日常开发天然会碰到它。
## 追问
### HTTP 为什么返回 Observable 而不是 Promise?
Observable 可以取消、组合,也能和表单、路由这些流保持同一套写法。单次 HTTP 看起来和 Promise 差不多,但需要重试、超时、取消旧请求时更顺手。
### AsyncPipe 解决了什么问题?
它自动订阅 Observable,并在组件销毁时退订,减少内存泄漏。复杂副作用仍应放在组件或服务中组织。
### 表单搜索一般怎么写?
监听 valueChanges,先过滤空值,再防抖、去重,最后用 switchMap 请求接口。关键是旧请求要能被取消。
### BehaviorSubject 适合做全局状态吗?
小型状态可以,比如当前用户、筛选条件、侧边栏开关。状态变多或需要调试时,NgRx、Signal Store 更稳。
## 写段代码
```ts
results$ = this.form.controls.keyword.valueChanges.pipe(
filter(v => !!v && v.length >= 2), debounceTime(300), distinctUntilChanged(),
switchMap(q => this.api.search(q))
);
```前端5月30日 21:21
RxJS 性能优化应该从哪些地方下手?RxJS 性能优化先看三件事:有没有重复订阅,有没有处理过多无效事件,有没有订阅生命周期失控。多数慢不是操作符本身慢,而是同一个 HTTP Observable 被订阅多次、输入框每个字符都打接口、组件销毁后流还在跑。优化时不要一上来堆操作符,先用浏览器 Network、Performance 和简单日志确认瓶颈在哪里。
## 追问
### share 和 shareReplay 应该怎么选?
share 只共享当前订阅期,晚来的订阅者拿不到历史值。shareReplay(1) 会回放最近一次结果,适合 HTTP 缓存,但要注意 refCount 和生命周期。
### 为什么搜索框通常用 switchMap?
用户连续输入时,旧关键词的请求结果已经没价值了。switchMap 会取消上一次内部订阅,避免慢请求晚返回覆盖新结果。
### mergeMap 限制并发有什么意义?
批量请求如果不限制并发,浏览器连接数、服务端限流和内存都会被打满。可以用第二个参数控制同时执行数量。
### 怎么避免内存泄漏?
模板里优先用 async pipe;必须手动订阅时,用 takeUntilDestroyed() 或明确的 takeUntil(destroy$)。
## 写段代码
```ts
from(ids).pipe(
mergeMap(id => api.load(id), 3)
).subscribe();
```服务端5月28日 08:26
RxJS Marble Testing 怎么写?弹珠测试核心用法与面试要点## 什么是 Marble Testing
RxJS 的异步数据流测试一直是前端开发中的难点——回调嵌套、定时器模拟、异步断言让测试代码既冗长又脆弱。Marble Testing 是 RxJS 官方提供的一种解决方案:用简短的字符串(称为 marble 弹珠字符串)可视化地描述 Observable 的时间线和事件,再由 TestScheduler 在虚拟时间中同步执行,把原本需要等待真实异步的测试变成瞬时可验证的同步断言。
一句话概括:**Marble Testing = 弹珠字符串 + TestScheduler = 用可视化语法写同步的异步测试**。
## Marble 语法速查
### 核心符号
| 符号 | 含义 | 示例 |
|------|------|------|
| `-` | 时间流逝(1 帧,约 10ms) | `---` 表示 30ms |
| `a-z` | 发出的值 | `-a-b-` 发出 a 和 b |
| `\|` | 完成 | `-a-b-\|` 发出后完成 |
| `#` | 错误 | `-a-#` 发出 a 后抛错 |
| `()` | 同步分组 | `(abc\|)` 同步发出 a、b、c 后完成 |
| `^` | 订阅点(hot Observable) | `^-a-b-` 从订阅点开始接收 |
| `!` | 取消订阅 | `^-a-!` 订阅后收到 a 就取消 |
### 常见 marble 字符串解读
```typescript
// 冷 Observable:从订阅时开始
cold('-a-b-c-|')
// → 10ms 发出 a,20ms 发出 b,30ms 发出 c,40ms 完成
cold('-a-b-#')
// → 10ms 发出 a,20ms 发出 b,30ms 报错
cold('(abc|)')
// → 同步发出 a、b、c,然后立即完成
// 热 Observable:从 ^ 标记处开始接收
hot('--a--b--c--|', { a: 1, b: 2, c: 3 })
// → ^ 之前的历史值对新订阅者不可见
```
## TestScheduler 基本用法
### 初始化 TestScheduler
```typescript
import { TestScheduler } from 'rxjs/testing';
let testScheduler: TestScheduler;
beforeEach(() => {
testScheduler = new TestScheduler((actual, expected) => {
// 深比较实际输出与期望输出
expect(actual).toEqual(expected);
});
});
```
> 关键点:`run()` 回调内提供的 `cold`、`hot`、`expectObservable`、`expectSubscriptions` 是测试的四大工具,不要在 `run()` 外部使用它们。
### 测试 map 操作符
```typescript
it('应将每个值转为大写', () => {
testScheduler.run(({ cold, expectObservable }) => {
const source$ = cold('-a-b-c-|', { a: 'hello', b: 'world', c: 'rxjs' });
const expected = '-A-B-C-|';
const result$ = source$.pipe(map(x => x.toUpperCase()));
expectObservable(result$).toBe(expected, {
A: 'HELLO', B: 'WORLD', C: 'RXJS'
});
});
});
```
> 为什么要传值映射?当 marble 字符串中的字母与实际值不同时,必须通过第二个参数映射,否则默认值就是字母本身。
## 面试高频:时间类操作符测试
时间相关操作符是 Marble Testing 最核心的应用场景,因为传统方式很难精确控制时间。
### delay
```typescript
it('应延迟 30ms 发出值', () => {
testScheduler.run(({ cold, expectObservable }) => {
const source$ = cold('-a-b-|');
const expected = '---a-b-|';
const result$ = source$.pipe(delay(30, testScheduler));
expectObservable(result$).toBe(expected);
});
});
```
### debounceTime
```typescript
it('应在 20ms 无新值后才发出', () => {
testScheduler.run(({ cold, expectObservable }) => {
const source$ = cold('-a--b--c---|');
const expected = '-----b--c---|';
const result$ = source$.pipe(debounceTime(20, testScheduler));
expectObservable(result$).toBe(expected);
});
});
```
### throttleTime
```typescript
it('应每 30ms 最多发出一个值', () => {
testScheduler.run(({ cold, expectObservable }) => {
const source$ = cold('-ab-cde-f-|');
const expected = '-a---d--f-|';
const result$ = source$.pipe(throttleTime(30, testScheduler));
expectObservable(result$).toBe(expected);
});
});
```
> 面试追问:debounceTime 和 throttleTime 的区别?前者等"安静期"再发,后者等"冷却期"再放行——两者在 marble 图上表现为截然不同的输出模式。
## 组合操作符的测试
### merge:交错合并
```typescript
it('应交错合并两个流', () => {
testScheduler.run(({ cold, expectObservable }) => {
const source1$ = cold('-a---b-|');
const source2$ = cold('--c-d---|');
const expected = '-a-c-b-d-|';
const result$ = merge(source1$, source2$);
expectObservable(result$).toBe(expected);
});
});
```
### concat:顺序拼接
```typescript
it('应顺序拼接两个流', () => {
testScheduler.run(({ cold, expectObservable }) => {
const source1$ = cold('-a-b-|');
const source2$ = cold('--c-d-|');
const expected = '-a-b--c-d-|';
const result$ = concat(source1$, source2$);
expectObservable(result$).toBe(expected);
});
});
```
### combineLatest:取最新组合
```typescript
it('应在任一流发出时组合最新值', () => {
testScheduler.run(({ cold, expectObservable }) => {
const source1$ = cold('-a---b-|', { a: 1, b: 2 });
const source2$ = cold('--c-d---|', { c: 10, d: 20 });
const expected = '----xy-z|';
const result$ = combineLatest([source1$, source2$]);
expectObservable(result$).toBe(expected, {
x: [1, 20], y: [2, 20], z: [2, 20]
});
});
});
```
> 面试追问:combineLatest 为什么第一个输出是 `[1, 20]` 而不是 `[1, 10]`?因为 combineLatest 要求每个源至少发出一次后才开始组合——source1$ 发出 a=1 时 source2$ 还没发出过值,直到 source2$ 发出 d=20 时两个流才都有值,此时组合的是 source1$ 的最新值 1 和 source2$ 的最新值 20。
## 错误处理测试
### catchError
```typescript
it('应捕获错误并返回替代值', () => {
testScheduler.run(({ cold, expectObservable }) => {
const source$ = cold('-a-b-#');
const expected = '-a-b-(d|)';
const result$ = source$.pipe(
catchError(() => of('d'))
);
expectObservable(result$).toBe(expected);
});
});
```
### retry
```typescript
it('应在出错时重试一次', () => {
testScheduler.run(({ cold, expectObservable }) => {
const source$ = cold('-a-#');
const expected = '-a-a-#';
const result$ = source$.pipe(retry(1));
expectObservable(result$).toBe(expected);
});
});
```
> 注意 `(d|)` 的括号:catchError 返回的 of('d') 是同步发出再完成的,在 marble 中必须用括号分组。
## 订阅与取消订阅验证
`expectSubscriptions` 专门验证 Observable 何时被订阅、何时被取消订阅,这是面试中区分"会用"和"理解原理"的分水岭。
```typescript
it('应在 take(2) 后自动取消订阅', () => {
testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const source$ = cold('-a-b-c-d-|');
const sub = '^---!';
const result$ = source$.pipe(take(2));
expectObservable(result$).toBe('-a-b-|');
expectSubscriptions(source$.subscriptions).toBe(sub);
});
});
```
> `^---!` 表示:订阅开始(^),经历 3 帧(---),取消订阅(!)。这验证了 take(2) 在收到第二个值后确实取消了上游订阅。
## 实战场景
### 搜索防抖
```typescript
it('应对输入做防抖后发起搜索', () => {
testScheduler.run(({ cold, expectObservable }) => {
const input$ = cold('-a--b---c-|');
const expected = '-----b---c-|';
const result$ = input$.pipe(
debounceTime(20, testScheduler),
distinctUntilChanged(),
switchMap(q => search(q))
);
expectObservable(result$).toBe(expected);
});
});
```
### 轮询与停止
```typescript
it('应按间隔轮询并在获取足够数据后停止', () => {
testScheduler.run(({ expectObservable }) => {
const expected = '-a-b-c-d-e-|';
const result$ = interval(10, testScheduler).pipe(
take(5),
map(i => String.fromCharCode(97 + i))
);
expectObservable(result$).toBe(expected);
});
});
```
## Hot Observable 的测试
Hot Observable 在订阅前就已经开始发出值,测试时用 `^` 标记订阅起点,之后才能收到值。
```typescript
it('应只接收订阅后的值', () => {
testScheduler.run(({ hot, expectObservable }) => {
const source$ = hot('--a--b--c--|');
const sub = '---^-------!';
const expected = '--b--c--|';
const result$ = source$.pipe(take(2));
expectObservable(result$, sub).toBe(expected);
});
});
```
> 面试追问:cold 和 hot 的本质区别?cold 每次订阅都重新开始,数据对每个订阅者独立;hot 共享同一个数据源,新订阅者只能收到订阅后的值。
## 常见陷阱
**1. marble 字符串长度必须对齐**
```typescript
// 错误:长度不一致
const source$ = cold('-a-b-|');
const expected = '-A-B-C-|'; // 多了 C,长度不匹配
// 正确:每个位置一一对应
const source$ = cold('-a-b-|');
const expected = '-A-B-|';
```
**2. 不要忘记传 TestScheduler**
```typescript
// 错误:使用了真实的 setTimeout
source$.pipe(debounceTime(100));
// 正确:传入 testScheduler 使用虚拟时间
source$.pipe(debounceTime(100, testScheduler));
```
**3. run() 内部不要使用真实异步**
```typescript
// 错误:run() 内用了 setInterval
testScheduler.run(() => {
setInterval(() => {}, 100); // 会干扰虚拟时间
});
```
**4. 值映射与默认值**
当 marble 字母就是你想表达的值时,可以省略映射对象;但当字母与实际值不同(如 `a` 代表 `1`),必须显式传入。
前端5月28日 08:23
RxJS 中 Hot Observable 和 Cold Observable 有什么区别?## 先搞清楚一个核心:数据生产者在哪
Cold 和 Hot 的本质区别只有一个:**数据生产者(Producer)是在订阅时创建,还是在 Observable 创建时就已经存在?**
- **Cold Observable**:生产者在订阅时才创建,每个订阅者拿到独立的生产者,这就是"单播"(Unicast)
- **Hot Observable**:生产者在 Observable 创建时就已经存在,所有订阅者共享同一个生产者,这就是"多播"(Multicast)
理解了这一点,后面所有特性都能推导出来,不需要死记硬背。
## Cold Observable:按需执行,人手一份
Cold Observable 是"惰性"的——没有人订阅,它什么都不做。每次有新订阅者,它都会从头执行一遍逻辑,产生一份独立的数据流。
```javascript
import { Observable } from 'rxjs';
const cold$ = new Observable(subscriber => {
console.log('执行逻辑');
subscriber.next(Math.random());
subscriber.complete();
});
cold$.subscribe(v => console.log('订阅者A:', v));
// 执行逻辑 → 订阅者A: 0.314159
cold$.subscribe(v => console.log('订阅者B:', v));
// 执行逻辑 → 订阅者B: 0.271828
// 两次订阅各执行一次,随机值不同——因为每个订阅者有独立的生产者
```
用一个生活类比:Cold Observable 像**电影院的电影文件**——每个观众点播时,影院单独为他播放一份,各看各的进度,互不影响。
**常见 Cold 操作符**:`of()`、`from()`、`interval()`、`timer()`、`ajax()`、Angular 的 `HttpClient.get()`
## Hot Observable:共享数据流,先到先得
Hot Observable 是"主动"的——不管有没有人订阅,生产者都在运作。新订阅者只能收到订阅之后的数据,之前发过的就错过了。
```javascript
import { Subject } from 'rxjs';
const subject = new Subject();
subject.subscribe(v => console.log('订阅者A:', v));
subject.next(1); // 订阅者A: 1
subject.subscribe(v => console.log('订阅者B:', v));
subject.next(2); // 订阅者A: 2, 订阅者B: 2
// 订阅者B 没收到 1,因为订阅晚了
```
类比:Hot Observable 像**电视直播**——频道一直在播,你打开电视只能看到当前和后续的节目,之前的已经播完了回不来。
**常见 Hot 来源**:`Subject` 及其变体(`BehaviorSubject`、`ReplaySubject`、`AsyncSubject`)、`fromEvent()` 绑定的 DOM 事件、WebSocket 连接
## 单播 vs 多播:从源码角度理解
Cold Observable 的 `subscribe` 函数里直接创建生产者:
```javascript
// Cold:每次 subscribe 都执行这个函数,各订阅者独立
const cold$ = new Observable(subscriber => {
const source = createProducer(); // 每个订阅者创建自己的生产者
source.onData(data => subscriber.next(data));
});
```
Hot Observable 的生产者在外部,`subscribe` 只是注册监听:
```javascript
// Hot:生产者已存在,subscribe 只是往里注册回调
const hot$ = new Observable(subscriber => {
externalSource.addListener(data => subscriber.next(data));
// 所有订阅者监听同一个 externalSource
});
```
所以 Cold → Hot 的本质就是**把内部生产者提到外部,让多个订阅者共享**。
## Cold 转 Hot 的三种方式
### share()——最常用
`share()` 内部使用 `Subject` 实现多播,并且带 `refCount` 机制:当订阅者数从 1 降到 0 时自动断开上游,再有新订阅者时重新连接。
```javascript
import { interval } from 'rxjs';
import { share, take } from 'rxjs/operators';
const source$ = interval(1000).pipe(take(5));
const shared$ = source$.pipe(share());
shared$.subscribe(v => console.log('A:', v));
setTimeout(() => shared$.subscribe(v => console.log('B:', v)), 2000);
// A 和 B 共享同一个 interval 计时器
// B 在第2秒加入,只能收到 2、3、4
```
### shareReplay(n)——缓存最近 n 个值
`shareReplay` 在 `share` 的基础上缓存最近的 n 个值,新订阅者能立即收到缓存数据,解决"来晚了错过数据"的问题。
```javascript
import { interval } from 'rxjs';
import { shareReplay, take } from 'rxjs/operators';
const source$ = interval(1000).pipe(take(5), shareReplay(1));
source$.subscribe(v => console.log('A:', v));
setTimeout(() => {
source$.subscribe(v => console.log('B:', v));
// B 立即收到缓存的最新的一个值,然后继续接收后续值
}, 3000);
```
**关键区别**:`share()` 的 refCount 在订阅者归零后断开上游,而 `shareReplay()` 默认不会断开(可通过 `config.resetOnComplete` 等参数调整)。
### publish() + connect()——手动控制
`publish()` 把 Cold Observable 变成 `ConnectableObservable`,必须手动调用 `connect()` 才开始执行。适合需要先注册所有订阅者再启动数据流的场景。
```javascript
import { interval } from 'rxjs';
import { publish, take } from 'rxjs/operators';
const source$ = interval(1000).pipe(take(5), publish());
source$.subscribe(v => console.log('A:', v));
source$.subscribe(v => console.log('B:', v));
// 此时不执行,等所有订阅者就绪
source$.connect(); // 手动启动
```
## 实际开发中的选择
### 用 Cold 的场景
- **HTTP 请求**:每个组件独立获取数据,互不干扰
- **独立计算**:每个订阅者需要各自的处理结果
- **可重复执行**:每次订阅都希望从头获取完整数据
### 用 Hot 的场景
- **共享 HTTP 结果**:多个组件需要同一接口的数据,用 `shareReplay(1)` 避免重复请求
- **事件监听**:DOM 事件、WebSocket 消息天然就是多播的
- **状态管理**:`BehaviorSubject` 持有最新状态,新订阅者立即获取当前值
## 最容易踩的坑
### 坑1:忘记共享导致重复请求
```javascript
// 每次订阅都发新请求——大忌
const data$ = http.get('/api/data');
data$.subscribe(handle1);
data$.subscribe(handle2); // 又发了一次请求
// 用 shareReplay 共享
const data$ = http.get('/api/data').pipe(shareReplay(1));
data$.subscribe(handle1);
data$.subscribe(handle2); // 只发一次请求
```
### 坑2:share() 的 refCount 陷阱
```javascript
const source$ = interval(1000).pipe(share());
const sub1 = source$.subscribe(v => console.log('A:', v));
const sub2 = source$.subscribe(v => console.log('B:', v));
sub1.unsubscribe();
sub2.unsubscribe();
// 所有订阅者都取消后,上游停止
source$.subscribe(v => console.log('C:', v));
// 重新订阅,上游重新连接,C 从0开始收数据
// 如果这里用 shareReplay(1),行为可能不同
```
### 坑3:shareReplay 缓存过多
```javascript
// 缓存1000个值,内存会爆
interval(1000).pipe(shareReplay(1000));
// 通常缓存1个就够了
interval(1000).pipe(shareReplay(1));
```
## 一张表总结
| 特性 | Cold Observable | Hot Observable |
|------|----------------|----------------|
| 生产者创建时机 | 订阅时 | Observable 创建时 |
| 数据流 | 每个订阅者独立 | 所有订阅者共享 |
| 传播方式 | 单播(Unicast) | 多播(Multicast) |
| 错过数据 | 不会,每次从头 | 会,只能收订阅后的数据 |
| 典型代表 | `of`、`from`、HTTP | `Subject`、DOM 事件 |
| 转 Hot | `share()`、`shareReplay()` | 不可转 Cold |
记住核心判断:**看生产者——订阅时创建就是 Cold,早就存在就是 Hot。** 面试中如果能从生产者角度解释单播/多播的区别,再提到 `share` 的 refCount 机制和 `shareReplay` 的缓存策略,基本就能拿到高分。前端5月27日 21:47
RxJS 中 switchMap、mergeMap、concatMap 有什么区别?常用操作符有哪些?## 答案:六类核心操作符
面试中最常考的 RxJS 操作符分六类:
**创建类**:`of`、`from`、`interval`、`timer` — 把同步值、数组、Promise、定时器转为 Observable。
**转换类**:`map` 逐值转换;`switchMap` 切换到新 Observable 并**取消前一个**(搜索框场景首选);`mergeMap` 并行处理所有内部 Observable(不取消,不保证顺序);`concatMap` 顺序处理,上一个完成才订阅下一个。
**过滤类**:`filter` 条件过滤;`take(n)` 只取前 n 个;`takeUntil(notifier)` notifier 发出时终止;`debounceTime` 停顿后才发出(防抖);`throttleTime` 间隔内只取第一个(节流)。
**组合类**:`merge` 并行合并;`concat` 顺序连接;`combineLatest` 任一更新都组合最新值;`zip` 按索引一一配对。
**错误处理类**:`catchError` 捕获并返回替代 Observable;`retry(n)` 失败后自动重试 n 次。
**工具类**:`tap` 执行副作用不改值(调试利器);`delay` 延迟发射;`distinctUntilChanged` 连续重复值只发一次。
### switchMap vs mergeMap vs concatMap:面试最高频对比
三者的核心区别在于**对前一个内部 Observable 的处理策略**:
- **switchMap**:新值到来 → 取消上一个 → 订阅新的。搜索框、路由切换等竞态场景必选,保证只拿到最新结果。
- **mergeMap**:全部并行订阅,互不干扰。适合批量请求、日志上报等不关心顺序的场景。可传第二参数控制并发数。
- **concatMap**:排队串行,上一个完成才订阅下一个。适合顺序敏感的写操作,如依次保存表单步骤。
```javascript
// switchMap:搜索框 — 自动取消旧请求,避免竞态
fromEvent(input, 'input').pipe(
debounceTime(300),
switchMap(query => http.get('/search', { query }))
)
// mergeMap:批量删除 — 限制3并发
ids.pipe(mergeMap(id => http.delete(`/item/${id}`), 3))
// concatMap:串行保存 — 保证顺序
actions.pipe(concatMap(action => http.post('/save', action)))
```
选错操作符的后果:搜索场景用 mergeMap 会显示旧请求结果覆盖新结果;表单保存用 mergeMap 可能后发先至导致数据错乱。
### 高频实战组合模式
```javascript
// 搜索防抖 + 去重 + 空值过滤 + 自动取消旧请求
searchInput.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(query => query.length > 0 ? searchAPI(query) : EMPTY)
)
// 表单自动保存(去重 + 防抖)
form.valueChanges.pipe(
debounceTime(1000),
distinctUntilChanged(),
switchMap(data => saveAPI(data))
)
```
### 追问方向
- `combineLatest` vs `zip` 的区别?— combineLatest 任一更新即触发输出,zip 等所有 Observable 都发了新值才按索引配对
- 如何避免内存泄漏?— `takeUntil(destroy$\)` 模式,组件销毁时 `destroy$.next()` 统一取消,优于手动 unsubscribe
- `shareReplay(1)` 解决什么问题?— 多订阅者共享同一数据源并缓存最新值,避免重复 HTTP 请求
- `exhaustMap` 是什么?— 忽略新值直到当前内部 Observable 完成,适合防止重复提交(登录按钮连击)
- `forkJoin` vs `combineLatest`?— forkJoin 等所有完成后只取各自最后一个值,类似 Promise.all;combineLatest 每次变化都触发前端5月27日 21:45
RxJS 中的 Observable 和 Promise 有什么区别?## 核心答案
Observable 和 Promise 都是处理异步操作的工具,关键区别在四个字:**单值/多值、惰性/急切、可取消/不可取消、单播/多播**。
| 维度 | Promise | Observable |
|------|---------|------------|
| 值的数量 | 只能 resolve 一次 | 可以 next 多次 |
| 执行时机 | 创建即执行(eager) | 订阅才执行(lazy) |
| 取消 | 无法取消 | unsubscribe() 取消 |
| 多播 | 同一 Promise 多次 then 共享同一结果 | 默认冷 Observable,每次订阅独立执行 |
| 错误重试 | 需手动包装 | retry 操作符一行搞定 |
| 操作符 | then/catch/finally | map、filter、switchMap 等上百个 |
## 逐条展开
**1. 单值 vs 多值**
Promise 只能 resolve 一次,后续调用被忽略;Observable 通过 next 可以持续推送数据。
```javascript
// Promise — 只拿到 1
const p = new Promise(resolve => {
resolve(1);
resolve(2); // 无效
});
// Observable — 拿到 1, 2, 3
const obs$ = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
```
**2. 惰性 vs 急切**
Promise 构造函数里的代码在 new 时就跑了;Observable 的回调要等 subscribe 才执行。这意味着不订阅的 Observable 什么也不做,省资源。
**3. 可取消**
这是面试高频追问点。Promise 一旦创建就跑到完,想中途撤销只能用第三方库包装。Observable 原生支持:
```javascript
const sub = interval(1000).subscribe(x => console.log(x));
setTimeout(() => sub.unsubscribe(), 3000); // 3秒后停止
```
**4. 冷 vs 热(单播 vs 多播)**
冷 Observable 每次订阅都重新执行一遍;热 Observable(如 Subject)多个订阅者共享同一数据源。Promise 天然是"热"的——创建后结果确定,所有 then 拿到同一个值。
```javascript
// 冷 Observable — 两个订阅各跑一次
const cold$ = of(Math.random());
cold$.subscribe(v => console.log(v)); // 0.123
cold$.subscribe(v => console.log(v)); // 0.456(不同值)
// 用 share() 变热
const hot$ = cold$.pipe(share());
```
**5. 错误处理与重试**
Promise 出错只能 catch 一次,重试要自己写循环。Observable 有 retry、retryWhen 等操作符,一行搞定指数退避重试。
```javascript
// Promise — 手动重试
function retryFetch(url, n) {
return fetch(url).catch(err => n > 0 ? retryFetch(url, n - 1) : Promise.reject(err));
}
// Observable — 一行重试
http.get(url).pipe(retry(3));
```
**6. 操作符生态**
Promise 只有 then 链,复杂异步编排(竞速、合并、去抖)要手写。Observable 有 switchMap、mergeMap、debounceTime、combineLatest 等操作符,声明式描述数据流。
## 面试追问方向
- **什么时候用 Promise 就够了?** 单次异步请求、不需要取消、不需要流式处理——Promise 更简单。
- **Observable 能完全替代 Promise 吗?** 理论上可以(from(promise) 转换),但简单场景用 Observable 是过度设计。
- **Angular 为什么选 Observable?** HTTP 请求可能需取消(路由离开),表单值变化是流,组件生命周期也是流——Observable 统一了这些模型。
前端5月27日 21:42
如何在 RxJS 中防止内存泄漏?## 核心答案
RxJS 内存泄漏的根因是**订阅了 Observable 却未取消订阅**,导致回调闭包持有外部引用,阻止垃圾回收。防止泄漏的关键就一条:**确保每个订阅都有取消的时机**。
最推荐的方式是 `takeUntil` 模式:
```typescript
private destroy$ = new Subject<void>();
ngOnInit() {
this.http.get('/api/data').pipe(
takeUntil(this.destroy$)
).subscribe(data => this.data = data);
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
```
组件销毁时 `destroy$` 发出通知,所有通过 `takeUntil` 管道的订阅自动完成,闭包释放,GC 可回收相关内存。
## 哪些 Observable 必须取消订阅
判断标准:**不会自动 complete 的流必须手动取消**。
- `interval`、`timer` —— 持续发射,永不完成
- `fromEvent` —— 事件监听,永不完成
- `Subject` / `BehaviorSubject` —— 需手动调 `complete()`
- Angular 的 `params`、`valueChanges` —— 持续流
HTTP 请求 `httpClient.get()` 发射一次后自动 complete,理论上不必取消,但用 `takeUntil` 也不亏——能顺便中断请求。
## 三种取消策略对比
**1. `takeUntil`** —— 最推荐,声明式,一个 `destroy$` 管所有订阅
**2. `Subscription` 聚合** —— `new Subscription()` + `.add()`,批量 `unsubscribe()`,适合非组件场景
**3. `take(1)` / `first()`** —— 只需首个值时用,取完自动 complete,注意如果流不发射也不会自动取消
## 容易踩的坑
**嵌套订阅**:外层订阅的回调里再 subscribe,内层订阅完全失控。用 `switchMap` 替代——它自动取消前一次内部订阅:
```typescript
// 嵌套订阅,内层泄漏
this.http.get('/user').subscribe(user => {
this.http.get(`/posts/${user.id}`).subscribe(...);
});
// switchMap 自动管理
this.http.get('/user').pipe(
switchMap(user => this.http.get(`/posts/${user.id}`))
).subscribe(posts => ...);
```
**闭包引用大对象**:订阅回调捕获外部变量,即使该变量不再使用,只要订阅存活就无法回收。取消订阅即释放闭包引用。
**服务中的订阅**:Service 生命周期等于应用生命周期,在里面 `.subscribe()` 几乎不可能取消。正确做法是返回 Observable,让调用方决定何时订阅和取消。
## 追问方向
- `takeUntil` 和 `unsubscribe()` 有什么区别?——前者在管道中完成流,后者是外部强制中断;`takeUntil` 更符合声明式风格
- `switchMap`、`concatMap`、`mergeMap` 哪个能防止内存泄漏?——`switchMap` 自动取消前一次,其余不会;多对多场景需配合 `takeUntil`
- 如何检测 RxJS 内存泄漏?——Chrome DevTools Memory 面板拍快照,对比组件销毁前后 retained size;或在 `ngOnDestroy` 打日志确认是否执行前端5月27日 21:29
RxJS 中如何处理背压(Backpressure)问题?## 什么是背压
RxJS 中 Observable 默认是推模型——生产者决定发射节奏。当数据产生速度超过消费者处理速度时,未处理的值会堆积在内存中,这就是背压问题。典型场景包括:高频 DOM 事件、WebSocket 消息流、快速轮询的传感器数据。
RxJS 不像 RxJava 有 Flowable 这种原生背压类型,它的策略是"丢、缓、换"三种思路。
## 丢:节流与采样
不需要每一条数据时,主动丢弃多余值:
- **throttleTime**:时间窗口内只取第一个,适合 scroll/resize 等高频事件
- **debounceTime**:等输入静止后再发射,适合搜索框联想
- **sampleTime**:周期性取最新值,中间值全部丢弃
- **auditTime**:周期性取窗口末尾值,与 sample 语义不同
```typescript
// 滚动事件:200ms 内只处理一次
fromEvent(window, 'scroll').pipe(
throttleTime(200)
)
// 搜索框:停止输入 300ms 后请求
fromEvent(input, 'input').pipe(
debounceTime(300),
switchMap(e => search(e.target.value))
)
```
关键区别:throttle 保证有规律地采样,debounce 保证只在"安静"后触发,面试中经常要求区分二者。
## 缓:批量打包
需要全部数据但可以延迟处理时,把值攒起来一起发:
- **bufferTime / bufferCount**:按时间或数量打包成数组
- **windowTime / windowCount**:类似 buffer,但输出内层 Observable 而非数组
```typescript
// 每 500ms 打包一次传感器数据
sensor$.pipe(bufferTime(500)).subscribe(batch => {
processBatch(batch); // 一次处理多条
});
```
buffer 的风险是缓冲区无限增长,生产速度持续超过消费速度时仍然会内存溢出。实际项目中建议配合 take 或设置上限。
## 换:切换并发模型
改变数据消费方式来匹配生产速度:
- **concatMap**:完全串行,一个结束再发下一个,最安全但最慢
- **mergeMap(n)**:限制并发数为 n,兼顾吞吐和资源
- **switchMap**:只保留最新请求,自动取消前一个
```typescript
// 并发限制为 3 的批量请求
from(ids).pipe(
mergeMap(id => fetchUser(id), 3)
)
// 搜索场景:自动取消旧请求
input$.pipe(
debounceTime(300),
switchMap(query => searchAPI(query))
)
```
面试高频追问:mergeMap、concatMap、switchMap、exhaustMap 四者的区别。核心区分点在于——新值到来时,是排队(concat)、并行(merge)、取消旧的(switch)还是忽略新的(exhaust)。
## 实际选型思路
面试中更看重你能否根据场景选对策略,而不是背诵 API:
| 场景 | 推荐策略 | 理由 |
|---|---|---|
| 滚动/resize 事件 | throttleTime | 只需采样,丢数据无害 |
| 搜索输入联想 | debounceTime + switchMap | 避免无效请求,自动取消旧请求 |
| 批量 API 请求 | mergeMap(n) | 控制并发,兼顾效率 |
| 有序上传文件 | concatMap | 顺序保证,避免竞争 |
| 高频传感器数据 | bufferTime | 批量处理更高效 |
RxJS 没有真正的拉取式背压,本质上是"有损控制"。如果业务要求零丢数据,需要考虑在架构层面引入队列或换用支持 Flowable 的方案。前端5月27日 21:26
RxJS 中如何创建自定义操作符?## 核心答案
RxJS 创建自定义操作符有三种方式,优先使用**组合现有操作符**,其次用 **pipeable 函数**,避免直接 new Observable:
```typescript
// 1. 组合现有操作符(推荐)
function debounceSearch(ms = 300) {
return pipe(
debounceTime(ms),
distinctUntilChanged()
);
}
// 2. pipeable 函数
function debug<T>(label: string): OperatorFunction<T, T> {
return source$ => source$.pipe(
tap(v => console.log(label, v))
);
}
```
## 三种方式的区别
| 方式 | 适用场景 | 复杂度 |
|------|---------|--------|
| 组合现有操作符 | 逻辑由已有操作符拼装即可 | 低 |
| pipeable 函数(返回 `source$ => new Observable`) | 需要精细控制订阅和退订 | 中 |
| 直接 new Observable | 极端定制场景,需手动管理全部生命周期 | 高 |
直接 new Observable 需要自己处理 next/error/complete 和 teardown,容易遗漏导致内存泄漏,非必要不使用。
## 追问:操作符内如何保证资源释放?
返回的 Observable 订阅时必须返回 teardown 函数:
```typescript
function withTimeout<T>(ms: number): OperatorFunction<T, T> {
return source$ => new Observable(subscriber => {
const timer = setTimeout(() => subscriber.error(new Error('timeout')), ms);
const sub = source$.subscribe({
next: v => subscriber.next(v),
error: e => subscriber.error(e),
complete: () => subscriber.complete()
});
// teardown:取消定时器 + 退订上游
return () => { clearTimeout(timer); sub.unsubscribe(); };
});
}
```
忘记返回 teardown 是自定义操作符最常见 bug,会导致定时器、子订阅等资源无法回收。
## 追问:RxJS 7+ 写法有什么变化?
`retryWhen`、`tap(next, error, complete)` 三参数重载等已在 RxJS 7 中弃用或移除。自定义操作符应使用 `retry({ count, delay })` 等新 API,组合操作符时优先用 `pipe()` 而非链式调用。前端5月27日 21:26
RxJS 中如何处理错误?有哪些错误处理操作符?## 核心答案
RxJS 中 Observable 一旦出错,整个流就会终止。常用的错误处理操作符有五个:
- **catchError** — 捕获错误,返回替代 Observable,流继续
- **retry(n)** — 出错后重新订阅,最多重试 n 次
- **retryWhen** — 自定义重试策略(延迟、指数退避等)
- **finalize** — 流结束(无论成功还是出错)时执行清理
- **onErrorResumeNext** — 出错后跳到下一个 Observable 继续
其中 catchError 是面试最常考的,关键在于理解它的位置决定行为:放在 `mergeMap` 内部只捕获单条流错误,放在外部则整个流被替换。
## catchError 的位置陷阱
```typescript
// 错误写法:外层 catchError,一条失败整条流终止
source$.pipe(
mergeMap(id => fetchData(id)),
catchError(() => of(fallback)) // 任一请求失败,后续全部跳过
);
// 正确写法:内层 catchError,单条失败不影响其他
source$.pipe(
mergeMap(id => fetchData(id).pipe(
catchError(() => of(fallback)) // 只替换这一条
))
);
```
面试中经常追问这个区别:内层捕获让每条数据流独立容错,外层捕获则是兜底策略。
## retry 与 retryWhen 怎么选
`retry(3)` 简单粗暴,立刻重试三次。实际项目中更常见带延迟的重试:
```typescript
source$.pipe(
retryWhen(errors =>
errors.pipe(
scan((count, err) => {
if (count >= 3) throw err;
return count + 1;
}, 0),
delayWhen(count => timer(Math.pow(2, count) * 1000))
)
)
);
```
指数退避重试是生产环境的标准做法,面试能答出这个基本过关。注意 RxJS 7 之后 `retryWhen` 已废弃,推荐用 `retry({ delay: ... })` 替代。
## finalize 不是 finally
`finalize` 无论成功、出错还是取消订阅都会执行,适合释放资源(关闭连接、清除定时器等)。它不接收参数,拿不到错误信息,别跟 Promise 的 finally 混淆。
## 追问方向
- catchError 返回 throwError 会怎样?——错误继续向上传播
- retry 的重试是重新订阅还是重新执行?——重新订阅整个上游
- Observable 出错后订阅者还能收到值吗?——不能,流已终止前端5月27日 21:25
RxJS 中的调度器(Scheduler)是什么?如何使用?## 调度器是什么
调度器决定 RxJS 中通知(next/error/complete)的执行时机和上下文。简单说:**它控制一段 Observable 逻辑"在什么时候跑"——同步、微任务、宏任务还是动画帧。**
RxJS 内置 5 种调度器,核心区别如下:
| 调度器 | 底层机制 | 典型场景 |
|--------|----------|----------|
| null(同步) | 直接递归调用 | 默认,少量数据 |
| queueScheduler | 同步蹦床调度 | 递归/迭代,防栈溢出 |
| asapScheduler | Promise.then(微任务) | 优先级高于宏任务的异步 |
| asyncScheduler | setInterval(宏任务) | 延迟、定时操作 |
| animationFrameScheduler | requestAnimationFrame | 浏览器动画 |
## 怎么指定调度器
三种方式:
**1. 创建时传入**——作为操作符最后一个参数:
```typescript
of(1, 2, 3, asyncScheduler).subscribe(console.log);
```
**2. observeOn / subscribeOn**:
- `observeOn(asyncScheduler)`:下游通知切到异步调度
- `subscribeOn(asyncScheduler)`:上游订阅动作切到异步调度
二者区别常被追问:`subscribeOn` 影响的是"什么时候开始执行 Observable 逻辑",`observeOn` 影响的是"下游在哪个调度器上接收通知"。
**3. schedule 方法**——手动调度:
```typescript
asyncScheduler.schedule(() => console.log('1s后'), 1000);
```
## 面试高频追问
**asapScheduler 和 asyncScheduler 有什么区别?**
asapScheduler 走微任务队列(Promise.then),asyncScheduler 走宏任务队列(setTimeout/setInterval)。微任务优先级更高,会在当前宏任务结束后、下一个宏任务之前执行。需要尽快响应但不阻塞主线程时选 asap,需要真正延迟时选 async。
**queueScheduler 和同步调度器有什么区别?**
同步调度器直接递归调用,大量数据可能栈溢出。queueScheduler 用蹦床调度(trampoline),把递归展开为循环迭代,避免栈溢出同时保持同步语义。
**什么时候该手动指定调度器?**
大多数时候不需要——RxJS 按最小并发原则自动选择默认调度器。手动指定主要出现在三个场景:测试中用 TestScheduler 控制时间、动画中用 animationFrameScheduler 同步渲染帧、需要改变默认执行上下文时。
**如何在测试中控制调度器?**
使用 `TestScheduler`,它提供虚拟时钟,可以用 marble diagram 声明时序并同步断言结果,避免真实等待。
## 选择建议
不需要调度器就不加。需要延迟选 async,需要非阻塞优先级选 asap,递归防溢出选 queue,动画选 animationFrame。过度使用调度器只会增加复杂度和性能开销。