RxJS 中 Hot Observable 和 Cold Observable 有什么区别?
先搞清楚一个核心:数据生产者在哪
Cold 和 Hot 的本质区别只有一个:数据生产者(Producer)是在订阅时创建,还是在 Observable 创建时就已经存在?
- Cold Observable:生产者在订阅时才创建,每个订阅者拿到独立的生产者,这就是"单播"(Unicast)
- Hot Observable:生产者在 Observable 创建时就已经存在,所有订阅者共享同一个生产者,这就是"多播"(Multicast)
理解了这一点,后面所有特性都能推导出来,不需要死记硬背。
Cold Observable:按需执行,人手一份
Cold Observable 是"惰性"的——没有人订阅,它什么都不做。每次有新订阅者,它都会从头执行一遍逻辑,产生一份独立的数据流。
javascriptimport { Observable } from 'rxjs'; const cold$ = new Observable(subscriber => { console.log('执行逻辑'); subscriber.next(Math.random()); subscriber.complete(); }); cold$.subscribe(v => console.log('订阅者A:', v)); // 执行逻辑 → 订阅者A: 0.314159 cold$.subscribe(v => console.log('订阅者B:', v)); // 执行逻辑 → 订阅者B: 0.271828 // 两次订阅各执行一次,随机值不同——因为每个订阅者有独立的生产者
用一个生活类比:Cold Observable 像电影院的电影文件——每个观众点播时,影院单独为他播放一份,各看各的进度,互不影响。
常见 Cold 操作符:of()、from()、interval()、timer()、ajax()、Angular 的 HttpClient.get()
Hot Observable:共享数据流,先到先得
Hot Observable 是"主动"的——不管有没有人订阅,生产者都在运作。新订阅者只能收到订阅之后的数据,之前发过的就错过了。
javascriptimport { Subject } from 'rxjs'; const subject = new Subject(); subject.subscribe(v => console.log('订阅者A:', v)); subject.next(1); // 订阅者A: 1 subject.subscribe(v => console.log('订阅者B:', v)); subject.next(2); // 订阅者A: 2, 订阅者B: 2 // 订阅者B 没收到 1,因为订阅晚了
类比:Hot Observable 像电视直播——频道一直在播,你打开电视只能看到当前和后续的节目,之前的已经播完了回不来。
常见 Hot 来源:Subject 及其变体(BehaviorSubject、ReplaySubject、AsyncSubject)、fromEvent() 绑定的 DOM 事件、WebSocket 连接
单播 vs 多播:从源码角度理解
Cold Observable 的 subscribe 函数里直接创建生产者:
javascript// Cold:每次 subscribe 都执行这个函数,各订阅者独立 const cold$ = new Observable(subscriber => { const source = createProducer(); // 每个订阅者创建自己的生产者 source.onData(data => subscriber.next(data)); });
Hot Observable 的生产者在外部,subscribe 只是注册监听:
javascript// Hot:生产者已存在,subscribe 只是往里注册回调 const hot$ = new Observable(subscriber => { externalSource.addListener(data => subscriber.next(data)); // 所有订阅者监听同一个 externalSource });
所以 Cold → Hot 的本质就是把内部生产者提到外部,让多个订阅者共享。
Cold 转 Hot 的三种方式
share()——最常用
share() 内部使用 Subject 实现多播,并且带 refCount 机制:当订阅者数从 1 降到 0 时自动断开上游,再有新订阅者时重新连接。
javascriptimport { interval } from 'rxjs'; import { share, take } from 'rxjs/operators'; const source$ = interval(1000).pipe(take(5)); const shared$ = source$.pipe(share()); shared$.subscribe(v => console.log('A:', v)); setTimeout(() => shared$.subscribe(v => console.log('B:', v)), 2000); // A 和 B 共享同一个 interval 计时器 // B 在第2秒加入,只能收到 2、3、4
shareReplay(n)——缓存最近 n 个值
shareReplay 在 share 的基础上缓存最近的 n 个值,新订阅者能立即收到缓存数据,解决"来晚了错过数据"的问题。
javascriptimport { interval } from 'rxjs'; import { shareReplay, take } from 'rxjs/operators'; const source$ = interval(1000).pipe(take(5), shareReplay(1)); source$.subscribe(v => console.log('A:', v)); setTimeout(() => { source$.subscribe(v => console.log('B:', v)); // B 立即收到缓存的最新的一个值,然后继续接收后续值 }, 3000);
关键区别:share() 的 refCount 在订阅者归零后断开上游,而 shareReplay() 默认不会断开(可通过 config.resetOnComplete 等参数调整)。
publish() + connect()——手动控制
publish() 把 Cold Observable 变成 ConnectableObservable,必须手动调用 connect() 才开始执行。适合需要先注册所有订阅者再启动数据流的场景。
javascriptimport { interval } from 'rxjs'; import { publish, take } from 'rxjs/operators'; const source$ = interval(1000).pipe(take(5), publish()); source$.subscribe(v => console.log('A:', v)); source$.subscribe(v => console.log('B:', v)); // 此时不执行,等所有订阅者就绪 source$.connect(); // 手动启动
实际开发中的选择
用 Cold 的场景
- HTTP 请求:每个组件独立获取数据,互不干扰
- 独立计算:每个订阅者需要各自的处理结果
- 可重复执行:每次订阅都希望从头获取完整数据
用 Hot 的场景
- 共享 HTTP 结果:多个组件需要同一接口的数据,用
shareReplay(1)避免重复请求 - 事件监听:DOM 事件、WebSocket 消息天然就是多播的
- 状态管理:
BehaviorSubject持有最新状态,新订阅者立即获取当前值
最容易踩的坑
坑1:忘记共享导致重复请求
javascript// 每次订阅都发新请求——大忌 const data$ = http.get('/api/data'); data$.subscribe(handle1); data$.subscribe(handle2); // 又发了一次请求 // 用 shareReplay 共享 const data$ = http.get('/api/data').pipe(shareReplay(1)); data$.subscribe(handle1); data$.subscribe(handle2); // 只发一次请求
坑2:share() 的 refCount 陷阱
javascriptconst source$ = interval(1000).pipe(share()); const sub1 = source$.subscribe(v => console.log('A:', v)); const sub2 = source$.subscribe(v => console.log('B:', v)); sub1.unsubscribe(); sub2.unsubscribe(); // 所有订阅者都取消后,上游停止 source$.subscribe(v => console.log('C:', v)); // 重新订阅,上游重新连接,C 从0开始收数据 // 如果这里用 shareReplay(1),行为可能不同
坑3:shareReplay 缓存过多
javascript// 缓存1000个值,内存会爆 interval(1000).pipe(shareReplay(1000)); // 通常缓存1个就够了 interval(1000).pipe(shareReplay(1));
一张表总结
| 特性 | Cold Observable | Hot Observable |
|---|---|---|
| 生产者创建时机 | 订阅时 | Observable 创建时 |
| 数据流 | 每个订阅者独立 | 所有订阅者共享 |
| 传播方式 | 单播(Unicast) | 多播(Multicast) |
| 错过数据 | 不会,每次从头 | 会,只能收订阅后的数据 |
| 典型代表 | of、from、HTTP | Subject、DOM 事件 |
| 转 Hot | share()、shareReplay() | 不可转 Cold |
记住核心判断:看生产者——订阅时创建就是 Cold,早就存在就是 Hot。 面试中如果能从生产者角度解释单播/多播的区别,再提到 share 的 refCount 机制和 shareReplay 的缓存策略,基本就能拿到高分。