Rxjs
一个 JavaScript 库,使用可观察量进行反应式编程,处理异步数据调用、回调和基于事件的程序。

查看更多相关内容
RxJS 中 Hot Observable 和 Cold Observable 有什么区别?## Hot Observable vs Cold Observable
### Cold Observable(冷 Observable)
**定义**: Cold Observable 是惰性的,每个订阅者都会独立执行 Observable 的逻辑。
**特点**:
- 每个订阅者都会获得独立的数据流
- 订阅时才开始执行
- 不共享数据
- 生产者不会主动推送数据
**示例**:
```javascript
import { Observable } from 'rxjs';
const cold$ = new Observable(subscriber => {
console.log('Observable executed');
subscriber.next(Math.random());
subscriber.complete();
});
cold$.subscribe(value => console.log('Subscriber 1:', value));
// Observable executed
// Subscriber 1: 0.123456
cold$.subscribe(value => console.log('Subscriber 2:', value));
// Observable executed
// Subscriber 2: 0.789012
// 注意:每次订阅都重新执行,产生不同的随机数
```
**常见的 Cold Observable**:
- `of()`
- `from()`
- `interval()`
- `timer()`
- `ajax()`
- `http.get()` (Angular)
- 大多数创建操作符
### Hot Observable(热 Observable)
**定义**: Hot Observable 是主动的,多个订阅者共享同一个数据流。
**特点**:
- 所有订阅者共享同一个数据流
- 即使没有订阅者也会执行
- 共享数据
- 生产者主动推送数据
**示例**:
```javascript
import { Observable, Subject } from 'rxjs';
const hot$ = new Observable(subscriber => {
console.log('Observable executed');
subscriber.next(Math.random());
subscriber.complete();
});
const subject = new Subject();
hot$.subscribe(subject);
subject.subscribe(value => console.log('Subscriber 1:', value));
// Observable executed
// Subscriber 1: 0.123456
subject.subscribe(value => console.log('Subscriber 2:', value));
// Subscriber 2: 0.123456
// 注意:两个订阅者收到相同的值
```
**常见的 Hot Observable**:
- `Subject` 及其变体
- `BehaviorSubject`
- `ReplaySubject`
- `AsyncSubject`
- DOM 事件(通过 `fromEvent`)
- WebSocket 连接
- `share()` 转换后的 Observable
## 转换方法
### 1. 使用 share() 将 Cold 转换为 Hot
```javascript
import { interval } from 'rxjs';
import { share, take } from 'rxjs/operators';
const cold$ = interval(1000).pipe(
take(5)
);
const hot$ = cold$.pipe(
share() // 转换为 Hot Observable
);
hot$.subscribe(value => console.log('Subscriber 1:', value));
hot$.subscribe(value => console.log('Subscriber 2:', value));
// 两个订阅者共享同一个数据流
```
### 2. 使用 shareReplay() 缓存值
```javascript
import { interval } from 'rxjs';
import { shareReplay, take } from 'rxjs/operators';
const hot$ = interval(1000).pipe(
take(5),
shareReplay(1) // 缓存最后一个值
);
hot$.subscribe(value => console.log('Subscriber 1:', value));
setTimeout(() => {
hot$.subscribe(value => console.log('Subscriber 2:', value));
// 新订阅者会立即收到缓存的值
}, 3000);
```
### 3. 使用 publish() 和 connect()
```javascript
import { interval } from 'rxjs';
import { publish, take } from 'rxjs/operators';
const cold$ = interval(1000).pipe(
take(5)
);
const hot$ = cold$.pipe(
publish() // 转换为 Hot Observable
);
hot$.subscribe(value => console.log('Subscriber 1:', value));
hot$.subscribe(value => console.log('Subscriber 2:', value));
hot$.connect(); // 开始执行
```
## 实际应用场景
### Cold Observable 适用场景
1. **HTTP 请求**
```javascript
// 每次订阅都会发起新的请求
http.get('/api/data').subscribe(data => {
console.log('Request 1:', data);
});
http.get('/api/data').subscribe(data => {
console.log('Request 2:', data);
});
```
2. **独立的数据处理**
```javascript
// 每个订阅者需要独立的数据流
of(1, 2, 3).pipe(
map(x => x * 2)
).subscribe(value => console.log(value));
```
3. **需要重新执行的场景**
```javascript
// 每次订阅都重新计算
const calculation$ = new Observable(subscriber => {
const result = expensiveCalculation();
subscriber.next(result);
subscriber.complete();
});
```
### Hot Observable 适用场景
1. **共享数据**
```javascript
// 多个组件共享同一个数据流
const userData$ = http.get('/api/user').pipe(
share()
);
component1.userData$.subscribe(user => {
console.log('Component 1:', user);
});
component2.userData$.subscribe(user => {
console.log('Component 2:', user);
});
// 只发起一次请求,两个组件共享结果
```
2. **事件流**
```javascript
// 多个订阅者监听同一个事件
const click$ = fromEvent(document, 'click').pipe(
share()
);
click$.subscribe(event => {
console.log('Handler 1:', event);
});
click$.subscribe(event => {
console.log('Handler 2:', event);
});
```
3. **WebSocket 连接**
```javascript
// 多个订阅者共享同一个 WebSocket 连接
const socket$ = webSocket('ws://localhost:8080').pipe(
share()
);
socket$.subscribe(message => {
console.log('Handler 1:', message);
});
socket$.subscribe(message => {
console.log('Handler 2:', message);
});
```
## 性能对比
### Cold Observable 性能特点
**优点**:
- 每个订阅者获得独立的数据流
- 不会相互影响
- 适合需要独立处理的场景
**缺点**:
- 可能重复执行相同的操作
- 浪费资源(如重复的 HTTP 请求)
- 内存占用可能更高
### Hot Observable 性能特点
**优点**:
- 共享数据流,避免重复执行
- 节省资源(如只发起一次 HTTP 请求)
- 内存占用更低
**缺点**:
- 订阅者可能错过之前的数据
- 需要管理订阅时机
- 可能出现竞态条件
## 选择指南
### 使用 Cold Observable 当:
- 每个订阅者需要独立的数据流
- 需要重新执行操作
- 订阅者之间不应该相互影响
- 数据源是按需生成的
### 使用 Hot Observable 当:
- 多个订阅者需要共享数据
- 需要避免重复执行(如 HTTP 请求)
- 数据源是主动推送的(如事件、WebSocket)
- 需要缓存数据供后续订阅者使用
## 最佳实践
### 1. HTTP 请求共享
```javascript
// ❌ 错误:每次订阅都发起请求
class UserService {
getUser(id: string) {
return http.get(`/api/users/${id}`);
}
}
// ✅ 正确:共享请求结果
class UserService {
private cache = new Map<string, Observable<User>>();
getUser(id: string) {
if (!this.cache.has(id)) {
this.cache.set(id, http.get(`/api/users/${id}`).pipe(
shareReplay(1)
));
}
return this.cache.get(id)!;
}
}
```
### 2. 事件处理
```javascript
// 使用 share() 共享事件流
const resize$ = fromEvent(window, 'resize').pipe(
debounceTime(200),
share()
);
resize$.subscribe(event => {
updateLayout1(event);
});
resize$.subscribe(event => {
updateLayout2(event);
});
```
### 3. 状态管理
```javascript
// 使用 BehaviorSubject 管理状态
const state$ = new BehaviorSubject(initialState);
state$.subscribe(state => {
console.log('Listener 1:', state);
});
state$.subscribe(state => {
console.log('Listener 2:', state);
});
// 更新状态
state$.next(newState);
```
## 常见陷阱
### 1. 忘记共享导致重复请求
```javascript
// ❌ 错误示例
const data$ = http.get('/api/data');
data$.subscribe(data => console.log('Component 1:', data));
data$.subscribe(data => console.log('Component 2:', data));
// 发起两次请求
// ✅ 正确示例
const data$ = http.get('/api/data').pipe(
share()
);
data$.subscribe(data => console.log('Component 1:', data));
data$.subscribe(data => console.log('Component 2:', data));
// 只发起一次请求
```
### 2. 错误的共享时机
```javascript
// ❌ 错误示例
const data$ = http.get('/api/data').pipe(
share()
);
// 立即订阅触发请求
data$.subscribe();
// 后续订阅者可能错过数据
setTimeout(() => {
data$.subscribe(data => console.log(data));
}, 2000);
// ✅ 正确示例
const data$ = http.get('/api/data').pipe(
shareReplay(1) // 缓存数据
);
```
### 3. 不当使用 shareReplay
```javascript
// ❌ 错误示例:缓存过多数据
const data$ = interval(1000).pipe(
shareReplay(1000) // 缓存1000个值,占用大量内存
);
// ✅ 正确示例:合理设置缓存大小
const data$ = interval(1000).pipe(
shareReplay(1) // 只缓存最后一个值
);
```
## 总结
理解 Hot 和 Cold Observable 的区别对于编写高效的 RxJS 代码至关重要:
1. **Cold Observable**: 惰性、独立执行、适合按需生成的数据
2. **Hot Observable**: 主动、共享执行、适合主动推送的数据
3. **转换方法**: 使用 `share()`、`shareReplay()` 等操作符进行转换
4. **性能考虑**: Hot Observable 可以避免重复执行,提高性能
5. **选择原则**: 根据场景选择合适的类型,避免不必要的资源浪费
正确使用这两种 Observable 类型,可以显著提升应用的性能和可维护性。
前端 · 2026年2月21日 16:59
RxJS 中的 Marble Testing 是什么?如何使用?## Marble Testing 的概念
Marble Testing 是 RxJS 中一种基于字符串的可视化测试方法,它使用特殊的语法来表示 Observable 的时间流和事件。这种方法让异步测试变得直观和易于理解。
## Marble 语法
### 基本符号
| 符号 | 含义 |
|------|------|
| `-` | 时间流逝(1帧,约10ms)|
| `a`, `b`, `c` | 发出的值 |
| `|` | 完成 |
| `#` | 错误 |
| `()` | 同步发出 |
| `^` | 订阅点(hot Observable)|
| `!` | 取消订阅 |
### 示例
```javascript
// 基本示例
const source$ = cold('-a-b-c-|');
// 含义:10ms后发出a,20ms后发出b,30ms后发出c,40ms后完成
// 错误示例
const error$ = cold('-a-b-#');
// 含义:10ms后发出a,20ms后发出b,30ms后出错
// 同步示例
const sync$ = cold('(abc|)');
// 含义:同步发出a、b、c,然后完成
// Hot Observable
const hot$ = hot('^-a-b-c-|');
// 含义:从订阅点开始,10ms后发出a,20ms后发出b,30ms后发出c,40ms后完成
```
## TestScheduler 的使用
### 基本设置
```javascript
import { TestScheduler } from 'rxjs/testing';
describe('My Observable Tests', () => {
let testScheduler: TestScheduler;
beforeEach(() => {
testScheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
});
});
```
### 测试基本操作符
```javascript
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';
it('should map values', () => {
testScheduler.run(({ cold, expectObservable }) => {
const source$ = cold('-a-b-c-|');
const expected = '-A-B-C-|';
const result$ = source$.pipe(
map(x => x.toUpperCase())
);
expectObservable(result$).toBe(expected, { a: 'a', b: 'b', c: 'c' });
});
});
it('should filter values', () => {
testScheduler.run(({ cold, expectObservable }) => {
const source$ = cold('-a-b-c-d-|');
const expected = '-a-c---|';
const result$ = source$.pipe(
filter(x => ['a', 'c'].includes(x))
);
expectObservable(result$).toBe(expected);
});
});
```
### 测试时间相关操作符
```javascript
import { of } from 'rxjs';
import { delay, debounceTime, throttleTime } from 'rxjs/operators';
it('should delay emissions', () => {
testScheduler.run(({ cold, expectObservable }) => {
const source$ = cold('-a-b-c-|');
const expected = '---a-b-c-|'; // 延迟30ms
const result$ = source$.pipe(
delay(30, testScheduler)
);
expectObservable(result$).toBe(expected);
});
});
it('should debounce', () => {
testScheduler.run(({ cold, expectObservable }) => {
const source$ = cold('-a--b---c-|');
const expected = '-----b---c-|';
const result$ = source$.pipe(
debounceTime(20, testScheduler)
);
expectObservable(result$).toBe(expected);
});
});
it('should throttle', () => {
testScheduler.run(({ cold, expectObservable }) => {
const source$ = cold('-a-b-c-d-|');
const expected = '-a---c---|';
const result$ = source$.pipe(
throttleTime(30, testScheduler)
);
expectObservable(result$).toBe(expected);
});
});
```
### 测试组合操作符
```javascript
import { of, merge, concat, combineLatest } from 'rxjs';
it('should merge observables', () => {
testScheduler.run(({ cold, expectObservable }) => {
const source1$ = cold('-a---b-|');
const source2$ = cold('--c-d---|');
const expected = '-a-c-b-d-|';
const result$ = merge(source1$, source2$);
expectObservable(result$).toBe(expected);
});
});
it('should concatenate observables', () => {
testScheduler.run(({ cold, expectObservable }) => {
const source1$ = cold('-a-b-|');
const source2$ = cold('--c-d-|');
const expected = '-a-b--c-d-|';
const result$ = concat(source1$, source2$);
expectObservable(result$).toBe(expected);
});
});
it('should combine latest', () => {
testScheduler.run(({ cold, expectObservable }) => {
const source1$ = cold('-a---b-|');
const source2$ = cold('--c-d---|');
const expected = '----ab-bd-|';
const result$ = combineLatest([source1$, source2$]);
expectObservable(result$).toBe(expected);
});
});
```
### 测试错误处理
```javascript
import { of, throwError } from 'rxjs';
import { catchError, retry } from 'rxjs/operators';
it('should catch errors', () => {
testScheduler.run(({ cold, expectObservable }) => {
const source$ = cold('-a-b-#');
const expected = '-a-b-(d|)';
const result$ = source$.pipe(
catchError(() => of('d'))
);
expectObservable(result$).toBe(expected);
});
});
it('should retry on error', () => {
testScheduler.run(({ cold, expectObservable }) => {
const source$ = cold('-a-#');
const expected = '-a-a-#';
const result$ = source$.pipe(
retry(1)
);
expectObservable(result$).toBe(expected);
});
});
```
### 测试订阅和取消订阅
```javascript
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';
it('should handle subscription', () => {
testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => {
const source$ = cold('-a-b-c-|');
const subs = '^------!';
const result$ = source$.pipe(take(2));
expectObservable(result$).toBe('-a-b-|');
expectSubscriptions(source$.subscriptions).toBe(subs);
});
});
it('should handle unsubscription', () => {
testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => {
const source$ = cold('-a-b-c-d-|');
const subs = '^---!';
const result$ = source$.pipe(take(2));
expectObservable(result$).toBe('-a-b-|');
expectSubscriptions(source$.subscriptions).toBe(subs);
});
});
```
## 实际应用示例
### 1. 测试搜索功能
```javascript
import { of } from 'rxjs';
import { debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';
function search(query: string) {
return of(`Results for ${query}`);
}
it('should search with debounce', () => {
testScheduler.run(({ cold, expectObservable }) => {
const input$ = cold('-a--b---c-|');
const expected = '-----b---c-|';
const result$ = input$.pipe(
debounceTime(20, testScheduler),
distinctUntilChanged(),
switchMap(query => search(query))
);
expectObservable(result$).toBe(expected);
});
});
```
### 2. 测试自动保存
```javascript
import { of } from 'rxjs';
import { debounceTime, switchMap } from 'rxjs/operators';
function save(data: any) {
return of('Saved');
}
it('should auto-save with debounce', () => {
testScheduler.run(({ cold, expectObservable }) => {
const changes$ = cold('-a--b---c-|');
const expected = '-----b---c-|';
const result$ = changes$.pipe(
debounceTime(20, testScheduler),
switchMap(data => save(data))
);
expectObservable(result$).toBe(expected);
});
});
```
### 3. 测试轮询功能
```javascript
import { interval } from 'rxjs';
import { take, map } from 'rxjs/operators';
it('should poll at intervals', () => {
testScheduler.run(({ cold, expectObservable }) => {
const expected = '-a-b-c-d-e-|';
const result$ = interval(10, testScheduler).pipe(
take(5),
map(x => String.fromCharCode(97 + x))
);
expectObservable(result$).toBe(expected);
});
});
```
### 4. 测试缓存功能
```javascript
import { of } from 'rxjs';
import { shareReplay } from 'rxjs/operators';
it('should cache values', () => {
testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const source$ = cold('-a-b-c-|');
const expected = '-a-b-c-|';
const subs = ['^------!', ' ^-!'];
const cached$ = source$.pipe(shareReplay(1));
expectObservable(cached$).toBe(expected);
expectObservable(cached$).toBe('--c-|');
expectSubscriptions(source$.subscriptions).toBe(subs);
});
});
```
## 高级用法
### 1. 测试 Hot Observable
```javascript
it('should handle hot observable', () => {
testScheduler.run(({ hot, expectObservable }) => {
const source$ = hot('--a--b--c--|');
const sub = '---^--------!';
const expected = '--b--c--|';
const result$ = source$.pipe(take(2));
expectObservable(result$, sub).toBe(expected);
});
});
```
### 2. 测试多播
```javascript
import { of } from 'rxjs';
import { share, multicast } from 'rxjs/operators';
import { Subject } from 'rxjs';
it('should multicast correctly', () => {
testScheduler.run(({ cold, expectObservable }) => {
const source$ = cold('-a-b-c-|');
const expected = '-a-b-c-|';
const shared$ = source$.pipe(share());
expectObservable(shared$).toBe(expected);
expectObservable(shared$).toBe(expected);
});
});
```
### 3. 测试自定义操作符
```javascript
import { Observable } from 'rxjs';
import { OperatorFunction } from 'rxjs';
function customMap<T, R>(project: (value: T) => R): OperatorFunction<T, R> {
return (source$) => new Observable(subscriber => {
return source$.subscribe({
next: value => {
try {
subscriber.next(project(value));
} catch (error) {
subscriber.error(error);
}
},
error: error => subscriber.error(error),
complete: () => subscriber.complete()
});
});
}
it('should use custom operator', () => {
testScheduler.run(({ cold, expectObservable }) => {
const source$ = cold('-a-b-c-|');
const expected = '-A-B-C-|';
const result$ = source$.pipe(
customMap(x => x.toUpperCase())
);
expectObservable(result$).toBe(expected);
});
});
```
## 最佳实践
### 1. 使用有意义的值
```javascript
// ✅ 好的做法
it('should map values', () => {
testScheduler.run(({ cold, expectObservable }) => {
const source$ = cold('-a-b-c-|');
const expected = '-A-B-C-|';
const result$ = source$.pipe(map(x => x.toUpperCase()));
expectObservable(result$).toBe(expected, {
a: 'a', b: 'b', c: 'c',
A: 'A', B: 'B', C: 'C'
});
});
});
// ❌ 不好的做法
it('should map values', () => {
testScheduler.run(({ cold, expectObservable }) => {
const source$ = cold('-a-b-c-|');
const expected = '-d-e-f-|';
const result$ = source$.pipe(map(x => x.toUpperCase()));
expectObservable(result$).toBe(expected);
});
});
```
### 2. 测试边界情况
```javascript
it('should handle empty observable', () => {
testScheduler.run(({ cold, expectObservable }) => {
const source$ = cold('|');
const expected = '|';
const result$ = source$.pipe(map(x => x.toUpperCase()));
expectObservable(result$).toBe(expected);
});
});
it('should handle error observable', () => {
testScheduler.run(({ cold, expectObservable }) => {
const source$ = cold('-#');
const expected = '-#';
const result$ = source$.pipe(map(x => x.toUpperCase()));
expectObservable(result$).toBe(expected);
});
});
```
### 3. 使用 expectSubscriptions
```javascript
it('should subscribe and unsubscribe correctly', () => {
testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const source$ = cold('-a-b-c-|');
const subs = '^------!';
const result$ = source$.pipe(take(3));
expectObservable(result$).toBe('-a-b-c-|');
expectSubscriptions(source$.subscriptions).toBe(subs);
});
});
```
## 总结
Marble Testing 是 RxJS 中强大的测试工具,它提供了:
1. **可视化测试**: 使用字符串表示时间流,直观易懂
2. **时间控制**: 精确控制异步操作的时序
3. **易于维护**: 清晰的语法和结构
4. **全面覆盖**: 可以测试各种操作符和场景
掌握 Marble Testing 可以显著提升 RxJS 代码的测试质量和开发效率。
服务端 · 2026年2月21日 16:58
RxJS 中 switchMap、mergeMap、concatMap 有什么区别?## switchMap、mergeMap、concatMap 的核心区别
这三个操作符都是用于处理高阶 Observable(Observable of Observable)的,但它们的处理策略完全不同:
### 1. switchMap
**特点**: 取消之前的内部 Observable,只处理最新的
**工作原理**:
- 当新的值到达时,取消之前未完成的 Observable
- 只保留最新的 Observable 的结果
- 适合需要取消旧请求的场景
**示例代码**:
```javascript
import { of, interval } from 'rxjs';
import { switchMap, take, delay } from 'rxjs/operators';
// 模拟异步请求
function makeRequest(id) {
return of(`Result ${id}`).pipe(delay(1000));
}
interval(500).pipe(
switchMap(id => makeRequest(id)),
take(5)
).subscribe(console.log);
// 输出: Result 4
// 解释: 因为每500ms触发一次,但请求需要1000ms
// 所以每次新请求来时,之前的请求都被取消了
// 最终只有最后一个请求完成了
```
**实际应用场景**:
- 搜索框输入:每次输入都取消上一次的搜索请求
- 自动完成:只显示最新输入的结果
- 导航切换:取消未完成的页面加载
```javascript
// 搜索框示例
searchInput.pipe(
switchMap(query => searchAPI(query))
).subscribe(results => {
// 只显示最新搜索的结果
displayResults(results);
});
```
### 2. mergeMap
**特点**: 并行处理所有内部 Observable
**工作原理**:
- 同时订阅所有内部 Observable
- 所有 Observable 的结果都会被发出
- 不保证顺序,结果可能交错
**示例代码**:
```javascript
import { of } from 'rxjs';
import { mergeMap, delay } from 'rxjs/operators';
function makeRequest(id) {
return of(`Result ${id}`).pipe(delay(id * 200));
}
of(1, 2, 3).pipe(
mergeMap(id => makeRequest(id))
).subscribe(console.log);
// 输出: Result 1, Result 2, Result 3
// 解释: 所有请求并行执行,按完成顺序输出
```
**实际应用场景**:
- 并行加载多个资源
- 批量处理独立任务
- 不需要顺序的并发请求
```javascript
// 并行加载用户数据示例
merge(
getUserProfile(userId),
getUserPosts(userId),
getUserComments(userId)
).pipe(
mergeMap(response => response.json())
).subscribe(data => {
// 所有数据并行加载完成
updateUI(data);
});
```
### 3. concatMap
**特点**: 顺序处理内部 Observable,一个完成后再处理下一个
**工作原理**:
- 按顺序订阅内部 Observable
- 当前 Observable 完成后才订阅下一个
- 保证结果的顺序
**示例代码**:
```javascript
import { of } from 'rxjs';
import { concatMap, delay } from 'rxjs/operators';
function makeRequest(id) {
return of(`Result ${id}`).pipe(delay(500));
}
of(1, 2, 3).pipe(
concatMap(id => makeRequest(id))
).subscribe(console.log);
// 输出: Result 1, Result 2, Result 3
// 解释: 每个请求按顺序执行,前一个完成后才执行下一个
```
**实际应用场景**:
- 需要保证顺序的请求
- 依赖前一个结果的后续请求
- 防止服务器过载
```javascript
// 顺序上传文件示例
files.pipe(
concatMap(file => uploadFile(file))
).subscribe(result => {
// 文件按顺序上传
console.log('Uploaded:', result);
});
```
## 对比总结
| 特性 | switchMap | mergeMap | concatMap |
|------|-----------|----------|-----------|
| 执行方式 | 取消旧的,只保留最新的 | 并行执行所有 | 顺序执行 |
| 结果顺序 | 只保留最新结果 | 不保证顺序 | 保证顺序 |
| 并发数 | 1(同时只有1个) | 无限制 | 1(同时只有1个) |
| 适用场景 | 搜索、自动完成 | 并行加载 | 顺序处理 |
| 性能 | 最快(取消旧请求) | 最快(并行) | 较慢(顺序) |
| 内存占用 | 低 | 高 | 低 |
## 选择指南
### 使用 switchMap 当:
- 需要取消旧请求
- 只关心最新结果
- 搜索、自动完成等场景
- 避免不必要的网络请求
### 使用 mergeMap 当:
- 需要并行处理
- 请求之间没有依赖
- 需要最大化性能
- 不关心结果顺序
### 使用 concatMap 当:
- 需要保证顺序
- 请求之间有依赖
- 需要限制并发数
- 避免服务器过载
## 性能考虑
```javascript
// 性能对比示例
const source = interval(100).pipe(take(10));
// switchMap: 只处理最后一个
source.pipe(
switchMap(x => of(x).pipe(delay(1000)))
).subscribe();
// mergeMap: 并行处理所有(可能造成性能问题)
source.pipe(
mergeMap(x => of(x).pipe(delay(1000)))
).subscribe();
// concatMap: 顺序处理(可能较慢)
source.pipe(
concatMap(x => of(x).pipe(delay(1000)))
).subscribe();
```
## 实际项目中的选择
```javascript
// 1. 搜索功能 - 使用 switchMap
searchInput.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(query => searchAPI(query))
).subscribe(results => displayResults(results));
// 2. 批量加载 - 使用 mergeMap
productIds.pipe(
mergeMap(id => getProductDetails(id))
).subscribe(products => renderProducts(products));
// 3. 顺序操作 - 使用 concatMap
commands.pipe(
concatMap(command => executeCommand(command))
).subscribe(result => logResult(result));
```
## 注意事项
1. **内存泄漏**: mergeMap 可能创建大量并发请求,需要注意内存使用
2. **取消逻辑**: switchMap 会自动取消,但 concatMap 和 mergeMap 不会
3. **错误处理**: 任何一个内部 Observable 出错都会导致整个流失败
4. **取消订阅**: 始终记得取消订阅以避免内存泄漏
## 最佳实践
```javascript
// 1. 结合错误处理
searchInput.pipe(
switchMap(query =>
searchAPI(query).pipe(
catchError(error => of([]))
)
)
).subscribe(results => displayResults(results));
// 2. 限制并发数(使用 mergeMap)
import { mergeMap } from 'rxjs/operators';
source.pipe(
mergeMap(value => process(value), 3) // 限制并发数为3
).subscribe();
// 3. 添加重试机制
source.pipe(
switchMap(value =>
apiCall(value).pipe(
retry(3),
catchError(error => of(defaultValue))
)
)
).subscribe();
```
前端 · 2026年2月21日 16:55
RxJS 中 Subject、BehaviorSubject、ReplaySubject 和 AsyncSubject 有什么区别?## Subject 的核心概念
Subject 是 RxJS 中一种特殊的 Observable,它既是 Observable 又是 Observer。这意味着它可以:
- 被多个观察者订阅
- 主动推送新值给所有订阅者
- 实现多播(multicast)功能
## Subject 类型
### 1. Subject
基础 Subject,每次有新订阅者时,不会回放之前的值
```javascript
const subject = new Subject();
subject.next(1);
subject.next(2);
const subscription1 = subject.subscribe(value => console.log('订阅者1:', value));
// 订阅者1: 3
// 订阅者1: 4
subject.next(3);
subject.next(4);
const subscription2 = subject.subscribe(value => console.log('订阅者2:', value));
// 订阅者2: 5
// 订阅者2: 6
subject.next(5);
subject.next(6);
```
### 2. BehaviorSubject
BehaviorSubject 会记住最新的值,新订阅者会立即接收到当前值
```javascript
const behaviorSubject = new BehaviorSubject('初始值');
behaviorSubject.subscribe(value => console.log('订阅者1:', value));
// 订阅者1: 初始值
behaviorSubject.next('值1');
// 订阅者1: 值1
behaviorSubject.subscribe(value => console.log('订阅者2:', value));
// 订阅者2: 值1 - 立即收到最新值
behaviorSubject.next('值2');
// 订阅者1: 值2
// 订阅者2: 值2
```
### 3. ReplaySubject
ReplaySubject 可以回放指定数量的历史值
```javascript
const replaySubject = new ReplaySubject(2); // 回放最后2个值
replaySubject.next('值1');
replaySubject.next('值2');
replaySubject.next('值3');
replaySubject.subscribe(value => console.log('订阅者1:', value));
// 订阅者1: 值2
// 订阅者1: 值3
replaySubject.next('值4');
// 订阅者1: 值4
replaySubject.subscribe(value => console.log('订阅者2:', value));
// 订阅者2: 值3
// 订阅者2: 值4
```
### 4. AsyncSubject
AsyncSubject 只在完成时发出最后一个值
```javascript
const asyncSubject = new AsyncSubject();
asyncSubject.subscribe(value => console.log('订阅者1:', value));
asyncSubject.next('值1');
asyncSubject.next('值2');
asyncSubject.next('值3');
// 此时还没有输出
asyncSubject.complete();
// 订阅者1: 值3 - 只发出最后一个值
asyncSubject.subscribe(value => console.log('订阅者2:', value));
// 订阅者2: 值3 - 新订阅者也能收到最后一个值
```
## 实际应用场景
### Subject 使用场景
- 事件总线
- 状态管理
- 多个组件共享数据流
### BehaviorSubject 使用场景
- 存储当前状态
- 表单状态管理
- 用户信息管理
### ReplaySubject 使用场景
- 缓存历史数据
- 重播操作日志
- 错误重试机制
### AsyncSubject 使用场景
- HTTP 请求缓存
- 只需要最终结果的计算
- 异步操作的最后值
## 与普通 Observable 的区别
```javascript
// 普通 Observable - 单播
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
});
observable.subscribe(v => console.log('观察者1:', v));
observable.subscribe(v => console.log('观察者2:', v));
// 每个订阅者独立执行
// Subject - 多播
const subject = new Subject();
const source = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
});
source.subscribe(subject);
subject.subscribe(v => console.log('订阅者1:', v));
subject.subscribe(v => console.log('订阅者2:', v));
// 所有订阅者共享同一个数据流
```
## 性能考虑
Subject 在多播场景下可以提高性能,避免重复执行相同的异步操作。但需要注意内存泄漏问题,及时取消订阅。
前端 · 2026年2月21日 16:55
RxJS 中如何进行性能优化?## RxJS 性能优化的核心原则
RxJS 性能优化主要关注以下几个方面:
- 减少不必要的订阅和取消订阅
- 优化操作符链的执行效率
- 合理使用多播和缓存
- 避免内存泄漏
- 减少计算开销
## 优化策略
### 1. 使用 share 和 shareReplay
避免重复执行相同的 Observable。
```javascript
import { of } from 'rxjs';
import { share, shareReplay } from 'rxjs/operators';
// ❌ 不好的做法:每次订阅都重新执行
const data$ = http.get('/api/data');
data$.subscribe(data => console.log('Subscriber 1:', data));
data$.subscribe(data => console.log('Subscriber 2:', data));
// 发起两次 HTTP 请求
// ✅ 好的做法:共享 Observable
const shared$ = http.get('/api/data').pipe(
share() // 或 shareReplay(1)
);
shared$.subscribe(data => console.log('Subscriber 1:', data));
shared$.subscribe(data => console.log('Subscriber 2:', data));
// 只发起一次 HTTP 请求
```
### 2. 使用 debounceTime 和 throttleTime
减少高频事件的处理频率。
```javascript
import { fromEvent } from 'rxjs';
import { debounceTime, throttleTime } from 'rxjs/operators';
// ❌ 不好的做法:处理每个滚动事件
fromEvent(window, 'scroll').subscribe(event => {
handleScroll(event); // 可能每秒触发数百次
});
// ✅ 好的做法:节流处理
fromEvent(window, 'scroll').pipe(
throttleTime(200) // 每 200ms 最多处理一次
).subscribe(event => {
handleScroll(event);
});
// ✅ 好的做法:防抖处理
fromEvent(searchInput, 'input').pipe(
debounceTime(300) // 停止输入 300ms 后才处理
).subscribe(event => {
search(event.target.value);
});
```
### 3. 使用 distinctUntilChanged
避免处理重复的值。
```javascript
import { fromEvent } from 'rxjs';
import { debounceTime, distinctUntilChanged } from 'rxjs/operators';
// ❌ 不好的做法:可能处理相同的搜索词
fromEvent(searchInput, 'input').pipe(
debounceTime(300)
).subscribe(event => {
search(event.target.value);
});
// ✅ 好的做法:只在值变化时处理
fromEvent(searchInput, 'input').pipe(
debounceTime(300),
distinctUntilChanged() // 避免重复搜索
).subscribe(event => {
search(event.target.value);
});
```
### 4. 使用 take 和 takeWhile
及时取消不再需要的订阅。
```javascript
import { interval } from 'rxjs';
import { take, takeWhile } from 'rxjs/operators';
// ❌ 不好的做法:无限订阅
interval(1000).subscribe(value => {
console.log(value);
// 需要手动取消订阅
});
// ✅ 好的做法:自动取消订阅
interval(1000).pipe(
take(10) // 只取 10 个值
).subscribe(value => {
console.log(value);
});
// ✅ 好的做法:条件取消
interval(1000).pipe(
takeWhile(value => value < 10)
).subscribe(value => {
console.log(value);
});
```
### 5. 使用 switchMap 而不是 mergeMap
在需要取消旧操作的场景中。
```javascript
import { fromEvent } from 'rxjs';
import { switchMap, mergeMap } from 'rxjs/operators';
// ❌ 不好的做法:所有请求都会完成
fromEvent(searchInput, 'input').pipe(
debounceTime(300),
mergeMap(query => searchAPI(query)) // 所有请求都会完成
).subscribe(results => {
displayResults(results);
});
// ✅ 好的做法:取消旧请求
fromEvent(searchInput, 'input').pipe(
debounceTime(300),
switchMap(query => searchAPI(query)) // 取消旧请求
).subscribe(results => {
displayResults(results);
});
```
### 6. 使用 buffer 和 bufferTime
批量处理数据,减少处理次数。
```javascript
import { interval } from 'rxjs';
import { bufferTime } from 'rxjs/operators';
// ❌ 不好的做法:逐个处理
interval(100).pipe(
take(100)
).subscribe(value => {
processItem(value); // 处理 100 次
});
// ✅ 好的做法:批量处理
interval(100).pipe(
take(100),
bufferTime(1000) // 每秒批量处理
).subscribe(buffer => {
processBatch(buffer); // 只处理 10 次
});
```
### 7. 使用 filter 提前过滤
尽早过滤掉不需要的数据。
```javascript
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';
// ❌ 不好的做法:先处理再过滤
of(1, 2, 3, 4, 5).pipe(
map(x => {
console.log('Processing:', x); // 处理所有值
return x * 2;
}),
filter(x => x > 5)
).subscribe(value => {
console.log('Result:', value);
});
// ✅ 好的做法:先过滤再处理
of(1, 2, 3, 4, 5).pipe(
filter(x => x > 2), // 先过滤
map(x => {
console.log('Processing:', x); // 只处理需要的值
return x * 2;
})
).subscribe(value => {
console.log('Result:', value);
});
```
### 8. 使用 finalize 清理资源
确保资源被正确释放。
```javascript
import { interval } from 'rxjs';
import { take, finalize } from 'rxjs/operators';
// ❌ 不好的做法:可能忘记清理
const subscription = interval(1000).pipe(
take(10)
).subscribe(value => {
console.log(value);
});
// 可能忘记取消订阅
// ✅ 好的做法:自动清理
interval(1000).pipe(
take(10),
finalize(() => {
console.log('Cleaning up...');
cleanupResources();
})
).subscribe(value => {
console.log(value);
});
```
## 高级优化技巧
### 1. 使用 combineLatest 而不是嵌套订阅
```javascript
// ❌ 不好的做法:嵌套订阅
this.userService.getUser(userId).subscribe(user => {
this.postsService.getPosts(user.id).subscribe(posts => {
this.commentsService.getComments(posts[0].id).subscribe(comments => {
// 处理数据
});
});
});
// ✅ 好的做法:使用 combineLatest
combineLatest([
this.userService.getUser(userId),
this.postsService.getPosts(userId),
this.commentsService.getComments(postId)
]).pipe(
map(([user, posts, comments]) => ({
user,
posts,
comments
}))
).subscribe(data => {
// 处理数据
});
```
### 2. 使用 mergeMap 限制并发
```javascript
import { of } from 'rxjs';
import { mergeMap } from 'rxjs/operators';
// ❌ 不好的做法:可能同时发起大量请求
const ids = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
from(ids).pipe(
mergeMap(id => fetchData(id)) // 可能同时发起 10 个请求
).subscribe(result => {
console.log(result);
});
// ✅ 好的做法:限制并发数
from(ids).pipe(
mergeMap(id => fetchData(id), 3) // 最多同时 3 个请求
).subscribe(result => {
console.log(result);
});
```
### 3. 使用 ReplaySubject 缓存数据
```javascript
import { ReplaySubject } from 'rxjs';
// ❌ 不好的做法:每次都重新获取
class DataService {
getData() {
return http.get('/api/data');
}
}
// ✅ 好的做法:缓存数据
class DataService {
private cache$ = new ReplaySubject(1);
getData() {
if (!this.hasFetched) {
http.get('/api/data').subscribe(data => {
this.cache$.next(data);
this.hasFetched = true;
});
}
return this.cache$.asObservable();
}
}
```
### 4. 使用 scan 而不是 reduce
scan 可以持续发出中间结果,而 reduce 只在完成时发出结果。
```javascript
import { of } from 'rxjs';
import { scan, reduce } from 'rxjs/operators';
// ❌ 不好的做法:只在完成时得到结果
of(1, 2, 3, 4, 5).pipe(
reduce((sum, value) => sum + value, 0)
).subscribe(sum => {
console.log('Final sum:', sum); // 只输出一次
});
// ✅ 好的做法:持续输出中间结果
of(1, 2, 3, 4, 5).pipe(
scan((sum, value) => sum + value, 0)
).subscribe(sum => {
console.log('Current sum:', sum); // 输出 5 次
});
```
### 5. 使用 tap 进行调试
```javascript
import { of } from 'rxjs';
import { map, filter, tap } from 'rxjs/operators';
// ✅ 好的做法:使用 tap 调试
of(1, 2, 3, 4, 5).pipe(
tap(value => console.log('Input:', value)),
filter(x => x > 2),
tap(value => console.log('After filter:', value)),
map(x => x * 2),
tap(value => console.log('After map:', value))
).subscribe(value => {
console.log('Output:', value);
});
```
## 性能监控
### 1. 使用 performance.now() 测量性能
```javascript
import { of } from 'rxjs';
import { map, tap } from 'rxjs/operators';
const startTime = performance.now();
of(1, 2, 3, 4, 5).pipe(
map(x => x * 2),
tap(() => {
const elapsed = performance.now() - startTime;
console.log(`Elapsed: ${elapsed.toFixed(2)}ms`);
})
).subscribe();
```
### 2. 使用 count 统计数据量
```javascript
import { of } from 'rxjs';
import { count, tap } from 'rxjs/operators';
of(1, 2, 3, 4, 5).pipe(
tap(value => console.log('Processing:', value)),
count()
).subscribe(count => {
console.log('Total processed:', count);
});
```
## 常见性能问题
### 1. 过多的订阅
```javascript
// ❌ 不好的做法:创建多个订阅
const data$ = http.get('/api/data');
data$.subscribe(data => updateUI1(data));
data$.subscribe(data => updateUI2(data));
data$.subscribe(data => updateUI3(data));
// ✅ 好的做法:共享订阅
const data$ = http.get('/api/data').pipe(
share()
);
data$.subscribe(data => updateUI1(data));
data$.subscribe(data => updateUI2(data));
data$.subscribe(data => updateUI3(data));
```
### 2. 内存泄漏
```javascript
// ❌ 不好的做法:忘记取消订阅
class MyComponent {
ngOnInit() {
this.dataService.getData().subscribe(data => {
this.data = data;
});
}
// 组件销毁时订阅仍然存在
}
// ✅ 好的做法:正确取消订阅
class MyComponent implements OnDestroy {
private destroy$ = new Subject<void>();
ngOnInit() {
this.dataService.getData().pipe(
takeUntil(this.destroy$)
).subscribe(data => {
this.data = data;
});
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
```
### 3. 不必要的计算
```javascript
// ❌ 不好的做法:重复计算
of(1, 2, 3).pipe(
map(x => {
const result = expensiveCalculation(x);
return result;
}),
map(x => {
const result = expensiveCalculation(x); // 重复计算
return result;
})
).subscribe();
// ✅ 好的做法:避免重复计算
of(1, 2, 3).pipe(
map(x => {
const result = expensiveCalculation(x);
return { value: x, result };
})
).subscribe(data => {
console.log(data.value, data.result);
});
```
## 最佳实践
### 1. 使用 AsyncPipe
```typescript
@Component({
template: `
<div *ngIf="data$ | async as data">
{{ data }}
</div>
`
})
export class MyComponent {
data$ = this.service.getData();
// AsyncPipe 自动管理订阅
}
```
### 2. 使用 takeUntil
```typescript
export class MyComponent implements OnDestroy {
private destroy$ = new Subject<void>();
ngOnInit() {
this.service.getData().pipe(
takeUntil(this.destroy$)
).subscribe(data => {
this.data = data;
});
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
```
### 3. 使用 shareReplay
```typescript
@Injectable()
export class DataService {
private cache = new Map<string, Observable<any>>();
getData(key: string) {
if (!this.cache.has(key)) {
this.cache.set(key, http.get(`/api/data/${key}`).pipe(
shareReplay(1)
));
}
return this.cache.get(key)!;
}
}
```
## 总结
RxJS 性能优化的关键点:
1. **共享订阅**: 使用 share 和 shareReplay 避免重复执行
2. **减少处理频率**: 使用 debounceTime 和 throttleTime
3. **过滤重复数据**: 使用 distinctUntilChanged
4. **及时取消订阅**: 使用 take 和 takeWhile
5. **选择合适的操作符**: switchMap vs mergeMap vs concatMap
6. **批量处理**: 使用 buffer 和 bufferTime
7. **提前过滤**: 使用 filter 尽早过滤不需要的数据
8. **清理资源**: 使用 finalize 确保资源释放
9. **避免嵌套订阅**: 使用 combineLatest 和 forkJoin
10. **限制并发**: 使用 mergeMap 的并发参数
掌握这些优化技巧可以显著提升 RxJS 应用的性能。
前端 · 2026年2月21日 16:54
RxJS 在 Angular 中如何应用?## RxJS 在 Angular 中的应用
RxJS 是 Angular 框架的核心依赖,广泛应用于异步操作、事件处理和数据流管理。
## 核心应用场景
### 1. HTTP 请求
Angular 的 HttpClient 返回 Observable,便于处理异步请求。
```typescript
import { HttpClient } from '@angular/common/http';
import { Observable } from 'rxjs';
@Injectable({
providedIn: 'root'
})
export class DataService {
constructor(private http: HttpClient) {}
getUsers(): Observable<User[]> {
return this.http.get<User[]>('/api/users');
}
getUserById(id: number): Observable<User> {
return this.http.get<User>(`/api/users/${id}`);
}
createUser(user: User): Observable<User> {
return this.http.post<User>('/api/users', user);
}
updateUser(id: number, user: User): Observable<User> {
return this.http.put<User>(`/api/users/${id}`, user);
}
deleteUser(id: number): Observable<void> {
return this.http.delete<void>(`/api/users/${id}`);
}
}
```
**在组件中使用**:
```typescript
import { Component, OnInit } from '@angular/core';
import { DataService } from './data.service';
@Component({
selector: 'app-user-list',
template: `
<div *ngIf="users$ | async as users">
<div *ngFor="let user of users">
{{ user.name }}
</div>
</div>
`
})
export class UserListComponent implements OnInit {
users$: Observable<User[]>;
constructor(private dataService: DataService) {}
ngOnInit() {
this.users$ = this.dataService.getUsers();
}
}
```
### 2. 表单处理
Angular 的响应式表单与 RxJS 完美集成。
```typescript
import { Component, OnInit } from '@angular/core';
import { FormBuilder, FormGroup, Validators } from '@angular/forms';
@Component({
selector: 'app-search-form',
template: `
<form [formGroup]="searchForm">
<input formControlName="search" placeholder="Search...">
</form>
`
})
export class SearchFormComponent implements OnInit {
searchForm: FormGroup;
constructor(private fb: FormBuilder) {
this.searchForm = this.fb.group({
search: ['', Validators.minLength(3)]
});
}
ngOnInit() {
// 监听搜索输入
this.searchForm.get('search')?.valueChanges.pipe(
debounceTime(300),
distinctUntilChanged(),
filter(query => query.length >= 3),
switchMap(query => this.search(query))
).subscribe(results => {
this.displayResults(results);
});
}
search(query: string): Observable<SearchResult[]> {
return this.http.get<SearchResult[]>(`/api/search?q=${query}`);
}
displayResults(results: SearchResult[]) {
// 显示搜索结果
}
}
```
### 3. 路由处理
使用 RxJS 处理路由参数和查询参数。
```typescript
import { Component, OnInit } from '@angular/core';
import { ActivatedRoute, Router } from '@angular/router';
@Component({
selector: 'app-user-detail',
template: `
<div *ngIf="user$ | async as user">
<h1>{{ user.name }}</h1>
<p>{{ user.email }}</p>
</div>
`
})
export class UserDetailComponent implements OnInit {
user$: Observable<User>;
constructor(
private route: ActivatedRoute,
private router: Router,
private dataService: DataService
) {}
ngOnInit() {
// 监听路由参数变化
this.user$ = this.route.paramMap.pipe(
switchMap(params => {
const id = Number(params.get('id'));
return this.dataService.getUserById(id);
})
);
}
navigateToUser(id: number) {
this.router.navigate(['/users', id]);
}
}
```
### 4. 状态管理
使用 BehaviorSubject 或 NgRx 进行状态管理。
**简单状态管理**:
```typescript
import { Injectable } from '@angular/core';
import { BehaviorSubject, Observable } from 'rxjs';
@Injectable({
providedIn: 'root'
})
export class StateService {
private state$ = new BehaviorSubject<AppState>({
user: null,
isLoading: false,
error: null
});
getState(): Observable<AppState> {
return this.state$.asObservable();
}
updateUser(user: User) {
const currentState = this.state$.value;
this.state$.next({ ...currentState, user });
}
setLoading(loading: boolean) {
const currentState = this.state$.value;
this.state$.next({ ...currentState, isLoading: loading });
}
}
```
**在组件中使用**:
```typescript
@Component({
selector: 'app-app',
template: `
<div *ngIf="state$ | async as state">
<div *ngIf="state.isLoading">Loading...</div>
<div *ngIf="state.user">Welcome, {{ state.user.name }}</div>
</div>
`
})
export class AppComponent {
state$: Observable<AppState>;
constructor(private stateService: StateService) {
this.state$ = this.stateService.getState();
}
}
```
## 高级应用模式
### 1. 使用 AsyncPipe
AsyncPipe 自动管理订阅和取消订阅。
```typescript
@Component({
selector: 'app-user-list',
template: `
<div *ngIf="users$ | async as users">
<div *ngFor="let user of users">
{{ user.name }}
</div>
</div>
`
})
export class UserListComponent {
users$: Observable<User[]>;
constructor(private dataService: DataService) {
this.users$ = this.dataService.getUsers();
}
}
```
### 2. 使用 takeUntil 防止内存泄漏
```typescript
import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
@Component({
selector: 'app-component',
template: `...`
})
export class MyComponent implements OnInit, OnDestroy {
private destroy$ = new Subject<void>();
ngOnInit() {
this.dataService.getUsers().pipe(
takeUntil(this.destroy$)
).subscribe(users => {
this.users = users;
});
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
```
### 3. 使用 shareReplay 缓存数据
```typescript
import { Injectable } from '@angular/core';
import { Observable, shareReplay } from 'rxjs';
@Injectable({
providedIn: 'root'
})
export class CacheService {
private cache = new Map<string, Observable<any>>();
get<T>(key: string, fetchFn: () => Observable<T>): Observable<T> {
if (!this.cache.has(key)) {
this.cache.set(key, fetchFn().pipe(
shareReplay(1)
));
}
return this.cache.get(key) as Observable<T>;
}
clear() {
this.cache.clear();
}
}
```
### 4. 使用 combineLatest 组合多个数据源
```typescript
@Component({
selector: 'app-dashboard',
template: `
<div *ngIf="dashboardData$ | async as data">
<h2>Users: {{ data.users.length }}</h2>
<h2>Posts: {{ data.posts.length }}</h2>
<h2>Comments: {{ data.comments.length }}</h2>
</div>
`
})
export class DashboardComponent {
dashboardData$: Observable<DashboardData>;
constructor(private dataService: DataService) {
this.dashboardData$ = combineLatest([
this.dataService.getUsers(),
this.dataService.getPosts(),
this.dataService.getComments()
]).pipe(
map(([users, posts, comments]) => ({
users,
posts,
comments
}))
);
}
}
```
## 常见问题和解决方案
### 1. 处理错误
```typescript
this.dataService.getUsers().pipe(
catchError(error => {
console.error('Failed to load users:', error);
return of([]); // 返回空数组作为降级
})
).subscribe(users => {
this.users = users;
});
```
### 2. 重试失败的请求
```typescript
this.dataService.getUsers().pipe(
retry(3), // 重试 3 次
catchError(error => {
console.error('Failed after retries:', error);
return of([]);
})
).subscribe(users => {
this.users = users;
});
```
### 3. 加载状态管理
```typescript
@Component({
selector: 'app-user-list',
template: `
<div *ngIf="isLoading">Loading...</div>
<div *ngIf="users$ | async as users">
<div *ngFor="let user of users">
{{ user.name }}
</div>
</div>
`
})
export class UserListComponent {
isLoading = false;
users$: Observable<User[]>;
constructor(private dataService: DataService) {}
loadUsers() {
this.isLoading = true;
this.dataService.getUsers().pipe(
finalize(() => {
this.isLoading = false;
})
).subscribe(users => {
this.users = users;
});
}
}
```
### 4. 搜索防抖
```typescript
@Component({
selector: 'app-search',
template: `
<input #searchInput (input)="onSearch($event)" placeholder="Search...">
<div *ngIf="results$ | async as results">
<div *ngFor="let result of results">
{{ result.name }}
</div>
</div>
`
})
export class SearchComponent {
results$: Observable<SearchResult[]>;
constructor(private dataService: DataService) {}
onSearch(event: Event) {
const query = (event.target as HTMLInputElement).value;
this.results$ = of(query).pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(q => this.dataService.search(q))
);
}
}
```
## 最佳实践
### 1. 使用 AsyncPipe
```typescript
// ✅ 推荐
@Component({
template: `<div *ngIf="data$ | async as data">{{ data }}</div>`
})
export class MyComponent {
data$ = this.service.getData();
}
// ❌ 不推荐
@Component({
template: `<div>{{ data }}</div>`
})
export class MyComponent implements OnInit, OnDestroy {
data: any;
private subscription: Subscription;
ngOnInit() {
this.subscription = this.service.getData().subscribe(data => {
this.data = data;
});
}
ngOnDestroy() {
this.subscription.unsubscribe();
}
}
```
### 2. 防止内存泄漏
```typescript
// ✅ 推荐
export class MyComponent implements OnDestroy {
private destroy$ = new Subject<void>();
ngOnInit() {
this.service.getData().pipe(
takeUntil(this.destroy$)
).subscribe(data => {
this.data = data;
});
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
// ❌ 不推荐
export class MyComponent {
ngOnInit() {
this.service.getData().subscribe(data => {
this.data = data;
});
// 忘记取消订阅
}
}
```
### 3. 错误处理
```typescript
// ✅ 推荐
this.service.getData().pipe(
catchError(error => {
console.error('Error:', error);
return of(defaultData);
})
).subscribe(data => {
this.data = data;
});
// ❌ 不推荐
this.service.getData().subscribe({
next: data => {
this.data = data;
},
error: error => {
console.error('Error:', error);
// 没有降级处理
}
});
```
### 4. 类型安全
```typescript
// ✅ 推荐
interface User {
id: number;
name: string;
email: string;
}
this.http.get<User[]>('/api/users').subscribe(users => {
users.forEach(user => {
console.log(user.name); // 类型安全
});
});
// ❌ 不推荐
this.http.get('/api/users').subscribe((users: any) => {
users.forEach((user: any) => {
console.log(user.name); // 没有类型检查
});
});
```
## 总结
RxJS 在 Angular 中的关键应用:
1. **HTTP 请求**: 使用 HttpClient 处理异步请求
2. **表单处理**: 监听表单值变化,实现防抖和验证
3. **路由处理**: 监听路由参数和查询参数变化
4. **状态管理**: 使用 BehaviorSubject 或 NgRx 管理应用状态
5. **AsyncPipe**: 自动管理订阅,防止内存泄漏
6. **错误处理**: 使用 catchError 和 retry 处理错误
7. **性能优化**: 使用 debounceTime、shareReplay 等优化性能
掌握 RxJS 在 Angular 中的应用是成为 Angular 高级开发者的关键。
服务端 · 2026年2月21日 16:54
RxJS 6 和 RxJS 7 有什么区别?## RxJS 6 与 RxJS 7 的主要变化
### 1. 导入方式的变化
**RxJS 6**:
```javascript
// 创建操作符
import { of, from, interval } from 'rxjs';
// 操作符(使用 pipe)
import { map, filter, switchMap } from 'rxjs/operators';
// 工具函数
import { Subscription } from 'rxjs';
```
**RxJS 7**:
```javascript
// 导入方式基本相同,但更加模块化
import { of, from, interval } from 'rxjs';
import { map, filter, switchMap } from 'rxjs/operators';
// 新增了一些操作符
import { debounceTime, distinctUntilChanged } from 'rxjs/operators';
```
### 2. 新增的操作符
**RxJS 7 新增了多个有用的操作符**:
#### 1. `partition`
根据谓词函数将 Observable 分成两个。
```javascript
import { of, partition } from 'rxjs';
const source$ = of(1, 2, 3, 4, 5, 6);
const [evens$, odds$] = partition(source$, x => x % 2 === 0);
evens$.subscribe(x => console.log('Even:', x)); // 2, 4, 6
odds$.subscribe(x => console.log('Odd:', x)); // 1, 3, 5
```
#### 2. `tap` 的改进
`tap` 现在接受一个对象,可以分别处理不同的通知类型。
```javascript
import { of } from 'rxjs';
import { tap } from 'rxjs/operators';
of(1, 2, 3).pipe(
tap({
subscribe: () => console.log('Subscribed'),
next: value => console.log('Next:', value),
error: error => console.log('Error:', error),
complete: () => console.log('Completed'),
unsubscribe: () => console.log('Unsubscribed')
})
).subscribe();
```
#### 3. `connectable` 操作符
简化了多播 Observable 的创建。
```javascript
import { interval, connectable } from 'rxjs';
import { take } from 'rxjs/operators';
const source$ = interval(1000).pipe(take(5));
const connectable$ = connectable(source$);
connectable$.subscribe(value => console.log('Subscriber 1:', value));
connectable$.subscribe(value => console.log('Subscriber 2:', value));
connectable$.connect();
```
#### 4. `shareReplay` 的改进
`shareReplay` 现在支持配置对象。
```javascript
import { interval } from 'rxjs';
import { shareReplay, take } from 'rxjs/operators';
const shared$ = interval(1000).pipe(
take(5),
shareReplay({
bufferSize: 2,
refCount: true
})
);
```
#### 5. `filter` 的类型推断改进
更好的 TypeScript 类型推断。
```javascript
import { of } from 'rxjs';
import { filter } from 'rxjs/operators';
const source$ = of(1, 2, 3, 4, 5);
// RxJS 7 中类型推断更准确
const even$ = source$.pipe(
filter(x => x % 2 === 0) // x 被推断为 number
);
even$.subscribe(x => console.log(x)); // x 是 number 类型
```
### 3. 废弃的操作符
以下操作符在 RxJS 7 中被废弃:
#### 1. `throwError`
**RxJS 6**:
```javascript
import { throwError } from 'rxjs';
throwError('Error message');
```
**RxJS 7**:
```javascript
import { throwError } from 'rxjs';
// 推荐使用工厂函数
throwError(() => new Error('Error message'));
```
#### 2. `concat` 和 `merge` 的静态方法
虽然仍然可用,但推荐使用 `concatWith` 和 `mergeWith`。
```javascript
import { of, concat, merge } from 'rxjs';
// 仍然可用
concat(of(1), of(2)).subscribe();
merge(of(1), of(2)).subscribe();
// 推荐使用
of(1).pipe(concatWith(of(2))).subscribe();
of(1).pipe(mergeWith(of(2))).subscribe();
```
### 4. 性能优化
#### 1. 更小的包体积
RxJS 7 通过树摇优化减少了包体积。
```javascript
// RxJS 7 只导入需要的操作符
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
// 打包时只会包含 of 和 map
```
#### 2. 更快的执行速度
某些操作符的执行速度得到了优化。
```javascript
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';
// RxJS 7 中这些操作符执行更快
of(1, 2, 3, 4, 5).pipe(
map(x => x * 2),
filter(x => x > 5)
).subscribe();
```
### 5. TypeScript 改进
#### 1. 更好的类型推断
```javascript
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';
// RxJS 7 中类型推断更准确
const result$ = of(1, 2, 3).pipe(
map(x => x.toString()), // 推断为 Observable<string>
filter(x => x.length > 0)
);
```
#### 2. 严格类型检查
```javascript
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
// RxJS 7 中类型检查更严格
of(1, 2, 3).pipe(
map(x => x.toUpperCase()) // TypeScript 错误:number 没有 toUpperCase
);
```
### 6. 错误处理改进
#### 1. 更好的错误信息
```javascript
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
of(1, 2, 3).pipe(
map(x => {
if (x === 2) throw new Error('Custom error');
return x;
})
).subscribe({
error: error => {
console.log(error.message); // 更清晰的错误信息
}
});
```
#### 2. `onErrorResumeNext` 的改进
```javascript
import { of, onErrorResumeNext } from 'rxjs';
const source1$ = of(1, 2, 3).pipe(
map(x => {
if (x === 2) throw new Error('Error');
return x;
})
);
const source2$ = of(4, 5, 6);
onErrorResumeNext(source1$, source2$).subscribe(console.log);
// 输出: 1, 4, 5, 6
```
### 7. 调度器改进
#### 1. `animationFrameScheduler` 的改进
```javascript
import { interval, animationFrameScheduler } from 'rxjs';
import { take } from 'rxjs/operators';
// RxJS 7 中 animationFrameScheduler 更稳定
interval(0, animationFrameScheduler).pipe(
take(60)
).subscribe(frame => {
console.log('Frame:', frame);
});
```
#### 2. `asapScheduler` 的改进
```javascript
import { of, asapScheduler } from 'rxjs';
// RxJS 7 中 asapScheduler 性能更好
of(1, 2, 3, asapScheduler).subscribe(value => {
console.log(value);
});
```
### 8. 测试工具改进
#### 1. `TestScheduler` 的改进
```javascript
import { TestScheduler } from 'rxjs/testing';
// RxJS 7 中 TestScheduler 更易用
const testScheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
testScheduler.run(({ cold, expectObservable }) => {
const source$ = cold('-a-b-c|');
const expected = '-a-b-c|';
expectObservable(source$).toBe(expected);
});
```
#### 2. `marble testing` 的改进
```javascript
import { TestScheduler } from 'rxjs/testing';
// RxJS 7 中 marble testing 更直观
testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => {
const source$ = cold('-a-b-c|');
const expected = '-a-b-c|';
expectObservable(source$).toBe(expected);
});
```
### 9. 文档和示例改进
#### 1. 更好的文档
RxJS 7 提供了更详细的文档和示例。
```javascript
// 官方文档提供了更多实用的示例
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';
of(1, 2, 3, 4, 5).pipe(
map(x => x * 2),
filter(x => x > 5)
).subscribe(console.log);
```
#### 2. 更多的最佳实践
官方文档中包含了更多的最佳实践建议。
```javascript
// 推荐使用 pipe 而不是链式调用
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';
// ✅ 推荐
of(1, 2, 3).pipe(
map(x => x * 2),
filter(x => x > 5)
).subscribe();
// ❌ 不推荐
of(1, 2, 3).pipe(
map(x => x * 2)
).pipe(
filter(x => x > 5)
).subscribe();
```
### 10. 迁移指南
#### 1. 从 RxJS 6 迁移到 RxJS 7
```bash
# 升级 RxJS
npm install rxjs@7
# 检查废弃的 API
npm run lint
```
#### 2. 常见迁移问题
**问题 1**: `throwError` 的使用
```javascript
// ❌ RxJS 6
throwError('Error message');
// ✅ RxJS 7
throwError(() => new Error('Error message'));
```
**问题 2**: 类型推断问题
```javascript
// 可能需要显式类型
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
const result$ = of(1, 2, 3).pipe(
map(x => x.toString())
) as Observable<string>;
```
## 总结
RxJS 7 相比 RxJS 6 的主要改进:
1. **新增操作符**: `partition`、`connectable` 等
2. **性能优化**: 更小的包体积,更快的执行速度
3. **TypeScript 改进**: 更好的类型推断和严格类型检查
4. **错误处理**: 更好的错误信息和处理机制
5. **调度器改进**: 更稳定和高效的调度器
6. **测试工具**: 更易用的 TestScheduler
7. **文档改进**: 更详细的文档和示例
迁移到 RxJS 7 可以带来更好的性能和开发体验,但需要注意一些 API 的变化。
前端 · 2026年2月21日 16:54
RxJS 中的 Observable 和 Promise 有什么区别?## 核心区别
Observable 和 Promise 都是处理异步操作的工具,但它们在设计理念和使用方式上有显著差异:
### 1. 执行时机
**Promise**: 一旦创建就会立即执行,无法取消
```javascript
const promise = new Promise((resolve) => {
console.log('Promise 立即执行');
resolve('done');
});
```
**Observable**: 只有订阅时才会执行,可以取消订阅
```javascript
const observable = new Observable((observer) => {
console.log('Observable 订阅时执行');
observer.next('data');
});
observable.subscribe();
```
### 2. 数据流
**Promise**: 只能发出一个值,然后完成或失败
```javascript
promise.then(value => console.log(value)); // 只接收一个值
```
**Observable**: 可以发出多个值,随时间推移持续推送数据
```javascript
observable.subscribe(value => console.log(value)); // 可以接收多个值
```
### 3. 可取消性
**Promise**: 无法取消,一旦创建就会执行到底
```javascript
const promise = fetch('/api/data');
// 无法中途取消这个请求
```
**Observable**: 可以通过 unsubscribe() 取消订阅
```javascript
const subscription = observable.subscribe();
subscription.unsubscribe(); // 取消订阅
```
### 4. 操作符支持
**Promise**: 只有链式调用 then/catch/finally
```javascript
promise
.then(data => processData(data))
.then(result => console.log(result))
.catch(error => handleError(error));
```
**Observable**: 丰富的操作符生态系统
```javascript
observable
.pipe(
map(data => processData(data)),
filter(result => result.isValid),
catchError(error => handleError(error))
)
.subscribe();
```
### 5. 懒加载 vs 急切加载
**Promise**: 急切执行,创建时就开始工作
**Observable**: 懒加载,直到订阅才开始执行
### 6. 同步/异步
**Promise**: 总是异步的
**Observable**: 可以是同步或异步的
## 实际应用场景
### 使用 Promise 的场景
- 单次异步操作
- 不需要取消
- 只需要一个结果
- 简单的异步流程
### 使用 Observable 的场景
- 需要处理多个值
- 需要取消操作
- 复杂的数据流处理
- 事件处理
- WebSocket 连接
- 实时数据流
## 性能考虑
Observable 在处理复杂异步流程时提供了更好的性能和灵活性,特别是在需要处理多个异步操作或需要取消操作的场景中。Promise 更适合简单的异步操作,代码更简洁直观。
前端 · 2026年2月21日 16:28
RxJS 中常用的操作符有哪些?如何使用?## 常用操作符分类
### 1. 创建操作符 (Creation Operators)
#### of
创建一个发出指定值的 Observable
```javascript
import { of } from 'rxjs';
of(1, 2, 3).subscribe(console.log);
// 输出: 1, 2, 3
```
#### from
将数组、Promise、Iterable 等转换为 Observable
```javascript
import { from } from 'rxjs';
from([1, 2, 3]).subscribe(console.log);
// 输出: 1, 2, 3
from(Promise.resolve('Hello')).subscribe(console.log);
// 输出: Hello
```
#### interval / timer
创建定时发出的 Observable
```javascript
import { interval, timer } from 'rxjs';
interval(1000).subscribe(console.log);
// 每秒发出一个递增数字: 0, 1, 2, 3...
timer(2000, 1000).subscribe(console.log);
// 2秒后开始,每秒发出一个数字: 0, 1, 2, 3...
```
### 2. 转换操作符 (Transformation Operators)
#### map
转换每个发出的值
```javascript
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
of(1, 2, 3).pipe(
map(x => x * 2)
).subscribe(console.log);
// 输出: 2, 4, 6
```
#### switchMap
取消之前的内部 Observable,只处理最新的
```javascript
import { fromEvent } from 'rxjs';
import { switchMap } from 'rxjs/operators';
fromEvent(document, 'click').pipe(
switchMap(() => fetch('/api/data'))
).subscribe(response => console.log(response));
```
#### mergeMap
并行处理所有内部 Observable
```javascript
import { of } from 'rxjs';
import { mergeMap } from 'rxjs/operators';
of(1, 2, 3).pipe(
mergeMap(x => of(x, x * 2))
).subscribe(console.log);
// 输出: 1, 2, 2, 4, 3, 6
```
#### concatMap
顺序处理内部 Observable,一个完成后再处理下一个
```javascript
import { of } from 'rxjs';
import { concatMap } from 'rxjs/operators';
of(1, 2, 3).pipe(
concatMap(x => of(x, x * 2))
).subscribe(console.log);
// 输出: 1, 2, 2, 4, 3, 6 (顺序执行)
```
### 3. 过滤操作符 (Filtering Operators)
#### filter
过滤符合条件的值
```javascript
import { of } from 'rxjs';
import { filter } from 'rxjs/operators';
of(1, 2, 3, 4, 5).pipe(
filter(x => x % 2 === 0)
).subscribe(console.log);
// 输出: 2, 4
```
#### take / takeLast / takeUntil
只取前几个、后几个或直到某个条件
```javascript
import { interval, fromEvent } from 'rxjs';
import { take, takeUntil } from 'rxjs/operators';
interval(1000).pipe(
take(3)
).subscribe(console.log);
// 输出: 0, 1, 2
interval(1000).pipe(
takeUntil(fromEvent(document, 'click'))
).subscribe(console.log);
// 点击时停止
```
#### debounceTime
在指定时间内只发出最后一个值
```javascript
import { fromEvent } from 'rxjs';
import { debounceTime } from 'rxjs/operators';
fromEvent(inputElement, 'input').pipe(
debounceTime(300)
).subscribe(event => console.log(event.target.value));
```
#### throttleTime
在指定时间内只发出第一个值
```javascript
import { fromEvent } from 'rxjs';
import { throttleTime } from 'rxjs/operators';
fromEvent(window, 'scroll').pipe(
throttleTime(200)
).subscribe(event => console.log('Scroll event'));
```
### 4. 组合操作符 (Combination Operators)
#### merge
合并多个 Observable,并行发出值
```javascript
import { merge, interval } from 'rxjs';
merge(
interval(1000).pipe(map(x => `A${x}`)),
interval(1500).pipe(map(x => `B${x}`))
).subscribe(console.log);
```
#### concat
顺序连接多个 Observable
```javascript
import { concat, of } from 'rxjs';
concat(
of(1, 2),
of(3, 4),
of(5, 6)
).subscribe(console.log);
// 输出: 1, 2, 3, 4, 5, 6
```
#### combineLatest
组合多个 Observable 的最新值
```javascript
import { combineLatest, of } from 'rxjs';
combineLatest([
of(1, 2, 3),
of('a', 'b', 'c')
]).subscribe(([num, char]) => console.log(num, char));
// 输出: [3, 'a'], [3, 'b'], [3, 'c']
```
#### zip
按索引组合多个 Observable 的值
```javascript
import { zip, of } from 'rxjs';
zip(
of(1, 2, 3),
of('a', 'b', 'c')
).subscribe(([num, char]) => console.log(num, char));
// 输出: [1, 'a'], [2, 'b'], [3, 'c']
```
### 5. 错误处理操作符 (Error Handling Operators)
#### catchError
捕获错误并返回新的 Observable
```javascript
import { of } from 'rxjs';
import { map, catchError } from 'rxjs/operators';
of(1, 2, 3, 4).pipe(
map(x => {
if (x === 3) throw new Error('Error!');
return x;
}),
catchError(error => of('default value'))
).subscribe(console.log);
// 输出: 1, 2, 'default value'
```
#### retry
重试失败的 Observable
```javascript
import { of } from 'rxjs';
import { map, retry } from 'rxjs/operators';
let count = 0;
of(1, 2, 3).pipe(
map(x => {
count++;
if (count < 3) throw new Error('Error!');
return x;
}),
retry(2)
).subscribe(console.log);
```
### 6. 工具操作符 (Utility Operators)
#### tap
执行副作用,不修改值
```javascript
import { of } from 'rxjs';
import { tap, map } from 'rxjs/operators';
of(1, 2, 3).pipe(
tap(x => console.log('Before:', x)),
map(x => x * 2),
tap(x => console.log('After:', x))
).subscribe();
```
#### delay
延迟发出值
```javascript
import { of } from 'rxjs';
import { delay } from 'rxjs/operators';
of(1, 2, 3).pipe(
delay(1000)
).subscribe(console.log);
// 1秒后输出: 1, 2, 3
```
## 实际应用示例
### 搜索框防抖
```javascript
fromEvent(searchInput, 'input').pipe(
debounceTime(300),
map(event => event.target.value),
filter(query => query.length > 2),
switchMap(query => searchAPI(query))
).subscribe(results => displayResults(results));
```
### 自动保存
```javascript
form.valueChanges.pipe(
debounceTime(1000),
distinctUntilChanged(),
switchMap(formData => saveAPI(formData))
).subscribe();
```
### 并行请求
```javascript
merge(
getUserData(userId),
getUserPosts(userId),
getUserComments(userId)
).pipe(
combineLatestAll()
).subscribe(([user, posts, comments]) => {
renderUserProfile(user, posts, comments);
});
```
## 最佳实践
1. 合理使用 pipe() 链式调用
2. 注意操作符的执行顺序
3. 及时取消订阅避免内存泄漏
4. 根据场景选择合适的操作符
5. 使用 TypeScript 获得更好的类型推断
前端 · 2026年2月21日 16:28
如何在 RxJS 中防止内存泄漏?## 内存泄漏的原因
在 RxJS 中,内存泄漏主要发生在以下几种情况:
### 1. 未取消订阅
最常见的内存泄漏原因是订阅了 Observable 但没有取消订阅。
```javascript
// ❌ 错误示例:内存泄漏
class MyComponent {
constructor() {
this.data$ = http.get('/api/data').subscribe(data => {
console.log(data);
});
}
}
// 组件销毁时,订阅仍然存在,导致内存泄漏
```
### 2. 长期运行的 Observable
interval、fromEvent 等会持续发出值的 Observable,如果不取消订阅会持续占用内存。
```javascript
// ❌ 错误示例
setInterval(() => {
console.log('Running...');
}, 1000);
// ✅ 正确示例
const subscription = interval(1000).subscribe();
subscription.unsubscribe();
```
### 3. 闭包引用
订阅回调中引用了外部变量,导致这些变量无法被垃圾回收。
```javascript
// ❌ 错误示例
function createSubscription() {
const largeData = new Array(1000000).fill('data');
return interval(1000).subscribe(() => {
console.log(largeData.length); // largeData 被闭包引用
});
}
const sub = createSubscription();
// 即使 sub 不再使用,largeData 也不会被释放
```
### 4. 事件监听器未移除
使用 fromEvent 创建的订阅如果不取消,事件监听器会一直存在。
```javascript
// ❌ 错误示例
fromEvent(document, 'click').subscribe(event => {
console.log('Clicked');
});
// 事件监听器永远不会被移除
```
## 防止内存泄漏的方法
### 1. 手动取消订阅
最直接的方法是在适当的时候调用 unsubscribe()。
```javascript
class MyComponent {
private subscriptions: Subscription[] = [];
ngOnInit() {
const sub1 = http.get('/api/data').subscribe(data => {
this.data = data;
});
const sub2 = interval(1000).subscribe(() => {
this.update();
});
this.subscriptions.push(sub1, sub2);
}
ngOnDestroy() {
this.subscriptions.forEach(sub => sub.unsubscribe());
}
}
```
### 2. 使用 takeUntil
takeUntil 是最常用的取消订阅方式之一。
```javascript
import { Subject, takeUntil } from 'rxjs';
class MyComponent {
private destroy$ = new Subject<void>();
ngOnInit() {
http.get('/api/data').pipe(
takeUntil(this.destroy$)
).subscribe(data => {
this.data = data;
});
interval(1000).pipe(
takeUntil(this.destroy$)
).subscribe(() => {
this.update();
});
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
```
### 3. 使用 take、takeWhile、takeLast
根据条件自动取消订阅。
```javascript
// take: 只取前 N 个值
interval(1000).pipe(
take(5)
).subscribe(value => console.log(value));
// 输出: 0, 1, 2, 3, 4 然后自动取消订阅
// takeWhile: 满足条件时继续订阅
interval(1000).pipe(
takeWhile(value => value < 5)
).subscribe(value => console.log(value));
// 输出: 0, 1, 2, 3, 4 然后自动取消订阅
// takeLast: 只取最后 N 个值
of(1, 2, 3, 4, 5).pipe(
takeLast(2)
).subscribe(value => console.log(value));
// 输出: 4, 5
```
### 4. 使用 first
只取第一个值,然后自动取消订阅。
```javascript
http.get('/api/data').pipe(
first()
).subscribe(data => {
console.log(data);
});
// 只发出第一个值就完成
```
### 5. 使用 AsyncPipe(Angular)
在 Angular 中,AsyncPipe 会自动管理订阅。
```typescript
@Component({
template: `
<div *ngIf="data$ | async as data">
{{ data }}
</div>
`
})
export class MyComponent {
data$ = http.get('/api/data');
// AsyncPipe 会自动取消订阅
}
```
### 6. 使用 finalize
在取消订阅时执行清理操作。
```javascript
http.get('/api/data').pipe(
finalize(() => {
console.log('Cleaning up...');
// 执行清理操作
})
).subscribe(data => {
console.log(data);
});
```
## 最佳实践
### 1. 组件级别的订阅管理
```typescript
import { Component, OnDestroy } from '@angular/core';
import { Subject, takeUntil } from 'rxjs';
@Component({
selector: 'app-my',
template: '...'
})
export class MyComponent implements OnDestroy {
private destroy$ = new Subject<void>();
constructor() {
this.setupSubscriptions();
}
private setupSubscriptions() {
// 所有订阅都使用 takeUntil
this.http.get('/api/user').pipe(
takeUntil(this.destroy$)
).subscribe(user => {
this.user = user;
});
this.route.params.pipe(
takeUntil(this.destroy$)
).subscribe(params => {
this.loadPage(params.id);
});
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
```
### 2. 创建可重用的取消订阅工具
```typescript
import { Subject, Observable } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
export class AutoUnsubscribe {
private destroy$ = new Subject<void>();
protected autoUnsubscribe<T>(observable: Observable<T>): Observable<T> {
return observable.pipe(takeUntil(this.destroy$));
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
// 使用
class MyComponent extends AutoUnsubscribe {
ngOnInit() {
this.autoUnsubscribe(http.get('/api/data')).subscribe(data => {
console.log(data);
});
}
}
```
### 3. 使用 Subscription 集合
```typescript
import { Subscription } from 'rxjs';
class MyService {
private subscriptions = new Subscription();
startMonitoring() {
const sub1 = interval(1000).subscribe();
const sub2 = fromEvent(document, 'click').subscribe();
this.subscriptions.add(sub1);
this.subscriptions.add(sub2);
}
stopMonitoring() {
this.subscriptions.unsubscribe();
}
}
```
### 4. 避免在回调中创建订阅
```typescript
// ❌ 错误示例
interval(1000).subscribe(() => {
http.get('/api/data').subscribe(data => {
console.log(data);
});
// 每次都创建新订阅,无法取消
});
// ✅ 正确示例
interval(1000).pipe(
switchMap(() => http.get('/api/data'))
).subscribe(data => {
console.log(data);
});
// switchMap 会自动取消之前的订阅
```
## 检测内存泄漏
### 1. 使用 Chrome DevTools
```javascript
// 在组件中添加标记
class MyComponent {
private id = Math.random();
ngOnDestroy() {
console.log(`Component ${this.id} destroyed`);
}
}
// 观察控制台,确认组件销毁时是否真的清理了订阅
```
### 2. 使用 RxJS 调试工具
```typescript
import { tap } from 'rxjs/operators';
http.get('/api/data').pipe(
tap({
subscribe: () => console.log('Subscribed'),
unsubscribe: () => console.log('Unsubscribed'),
next: value => console.log('Next:', value),
complete: () => console.log('Completed'),
error: error => console.log('Error:', error)
})
).subscribe();
```
## 常见陷阱
### 1. 忘记取消嵌套订阅
```typescript
// ❌ 错误示例
http.get('/api/user').subscribe(user => {
http.get(`/api/posts/${user.id}`).subscribe(posts => {
console.log(posts);
});
// 内层订阅没有被管理
});
// ✅ 正确示例
http.get('/api/user').pipe(
switchMap(user => http.get(`/api/posts/${user.id}`))
).subscribe(posts => {
console.log(posts);
});
```
### 2. 在服务中创建订阅
```typescript
// ❌ 错误示例
@Injectable()
export class DataService {
constructor(private http: HttpClient) {
this.http.get('/api/data').subscribe(data => {
this.data = data;
});
// 服务中的订阅很难取消
}
}
// ✅ 正确示例
@Injectable()
export class DataService {
private data$ = this.http.get('/api/data');
getData() {
return this.data$;
}
}
```
### 3. 忽略错误处理
```typescript
// ❌ 错误示例
http.get('/api/data').subscribe(data => {
console.log(data);
});
// 错误没有被处理,可能导致订阅无法正常完成
// ✅ 正确示例
http.get('/api/data').pipe(
catchError(error => {
console.error(error);
return of([]);
})
).subscribe(data => {
console.log(data);
});
```
## 总结
防止 RxJS 内存泄漏的关键是:
1. **始终取消订阅**:特别是对于长期运行的 Observable
2. **使用 takeUntil**:这是最推荐的取消订阅方式
3. **避免嵌套订阅**:使用 switchMap、concatMap 等操作符
4. **使用 AsyncPipe**:在 Angular 中优先使用 AsyncPipe
5. **定期检查**:使用 DevTools 检测内存泄漏
6. **错误处理**:确保错误被正确处理,避免订阅卡住
遵循这些最佳实践,可以有效地防止 RxJS 应用中的内存泄漏问题。
前端 · 2026年2月21日 16:28