5月27日 21:29

RxJS 中如何处理背压(Backpressure)问题?

什么是背压

RxJS 中 Observable 默认是推模型——生产者决定发射节奏。当数据产生速度超过消费者处理速度时,未处理的值会堆积在内存中,这就是背压问题。典型场景包括:高频 DOM 事件、WebSocket 消息流、快速轮询的传感器数据。

RxJS 不像 RxJava 有 Flowable 这种原生背压类型,它的策略是"丢、缓、换"三种思路。

丢:节流与采样

不需要每一条数据时,主动丢弃多余值:

  • throttleTime:时间窗口内只取第一个,适合 scroll/resize 等高频事件
  • debounceTime:等输入静止后再发射,适合搜索框联想
  • sampleTime:周期性取最新值,中间值全部丢弃
  • auditTime:周期性取窗口末尾值,与 sample 语义不同
typescript
// 滚动事件:200ms 内只处理一次 fromEvent(window, 'scroll').pipe( throttleTime(200) ) // 搜索框:停止输入 300ms 后请求 fromEvent(input, 'input').pipe( debounceTime(300), switchMap(e => search(e.target.value)) )

关键区别:throttle 保证有规律地采样,debounce 保证只在"安静"后触发,面试中经常要求区分二者。

缓:批量打包

需要全部数据但可以延迟处理时,把值攒起来一起发:

  • bufferTime / bufferCount:按时间或数量打包成数组
  • windowTime / windowCount:类似 buffer,但输出内层 Observable 而非数组
typescript
// 每 500ms 打包一次传感器数据 sensor$.pipe(bufferTime(500)).subscribe(batch => { processBatch(batch); // 一次处理多条 });

buffer 的风险是缓冲区无限增长,生产速度持续超过消费速度时仍然会内存溢出。实际项目中建议配合 take 或设置上限。

换:切换并发模型

改变数据消费方式来匹配生产速度:

  • concatMap:完全串行,一个结束再发下一个,最安全但最慢
  • mergeMap(n):限制并发数为 n,兼顾吞吐和资源
  • switchMap:只保留最新请求,自动取消前一个
typescript
// 并发限制为 3 的批量请求 from(ids).pipe( mergeMap(id => fetchUser(id), 3) ) // 搜索场景:自动取消旧请求 input$.pipe( debounceTime(300), switchMap(query => searchAPI(query)) )

面试高频追问:mergeMap、concatMap、switchMap、exhaustMap 四者的区别。核心区分点在于——新值到来时,是排队(concat)、并行(merge)、取消旧的(switch)还是忽略新的(exhaust)。

实际选型思路

面试中更看重你能否根据场景选对策略,而不是背诵 API:

场景推荐策略理由
滚动/resize 事件throttleTime只需采样,丢数据无害
搜索输入联想debounceTime + switchMap避免无效请求,自动取消旧请求
批量 API 请求mergeMap(n)控制并发,兼顾效率
有序上传文件concatMap顺序保证,避免竞争
高频传感器数据bufferTime批量处理更高效

RxJS 没有真正的拉取式背压,本质上是"有损控制"。如果业务要求零丢数据,需要考虑在架构层面引入队列或换用支持 Flowable 的方案。

标签:Rxjs