5月27日 21:29
RxJS 中如何处理背压(Backpressure)问题?
什么是背压
RxJS 中 Observable 默认是推模型——生产者决定发射节奏。当数据产生速度超过消费者处理速度时,未处理的值会堆积在内存中,这就是背压问题。典型场景包括:高频 DOM 事件、WebSocket 消息流、快速轮询的传感器数据。
RxJS 不像 RxJava 有 Flowable 这种原生背压类型,它的策略是"丢、缓、换"三种思路。
丢:节流与采样
不需要每一条数据时,主动丢弃多余值:
- throttleTime:时间窗口内只取第一个,适合 scroll/resize 等高频事件
- debounceTime:等输入静止后再发射,适合搜索框联想
- sampleTime:周期性取最新值,中间值全部丢弃
- auditTime:周期性取窗口末尾值,与 sample 语义不同
typescript// 滚动事件:200ms 内只处理一次 fromEvent(window, 'scroll').pipe( throttleTime(200) ) // 搜索框:停止输入 300ms 后请求 fromEvent(input, 'input').pipe( debounceTime(300), switchMap(e => search(e.target.value)) )
关键区别:throttle 保证有规律地采样,debounce 保证只在"安静"后触发,面试中经常要求区分二者。
缓:批量打包
需要全部数据但可以延迟处理时,把值攒起来一起发:
- bufferTime / bufferCount:按时间或数量打包成数组
- windowTime / windowCount:类似 buffer,但输出内层 Observable 而非数组
typescript// 每 500ms 打包一次传感器数据 sensor$.pipe(bufferTime(500)).subscribe(batch => { processBatch(batch); // 一次处理多条 });
buffer 的风险是缓冲区无限增长,生产速度持续超过消费速度时仍然会内存溢出。实际项目中建议配合 take 或设置上限。
换:切换并发模型
改变数据消费方式来匹配生产速度:
- concatMap:完全串行,一个结束再发下一个,最安全但最慢
- mergeMap(n):限制并发数为 n,兼顾吞吐和资源
- switchMap:只保留最新请求,自动取消前一个
typescript// 并发限制为 3 的批量请求 from(ids).pipe( mergeMap(id => fetchUser(id), 3) ) // 搜索场景:自动取消旧请求 input$.pipe( debounceTime(300), switchMap(query => searchAPI(query)) )
面试高频追问:mergeMap、concatMap、switchMap、exhaustMap 四者的区别。核心区分点在于——新值到来时,是排队(concat)、并行(merge)、取消旧的(switch)还是忽略新的(exhaust)。
实际选型思路
面试中更看重你能否根据场景选对策略,而不是背诵 API:
| 场景 | 推荐策略 | 理由 |
|---|---|---|
| 滚动/resize 事件 | throttleTime | 只需采样,丢数据无害 |
| 搜索输入联想 | debounceTime + switchMap | 避免无效请求,自动取消旧请求 |
| 批量 API 请求 | mergeMap(n) | 控制并发,兼顾效率 |
| 有序上传文件 | concatMap | 顺序保证,避免竞争 |
| 高频传感器数据 | bufferTime | 批量处理更高效 |
RxJS 没有真正的拉取式背压,本质上是"有损控制"。如果业务要求零丢数据,需要考虑在架构层面引入队列或换用支持 Flowable 的方案。