乐闻世界logo
搜索文章和话题

RxJS 中的 Marble Testing 是什么?如何使用?

2026年2月21日 16:58

Marble Testing 的概念

Marble Testing 是 RxJS 中一种基于字符串的可视化测试方法,它使用特殊的语法来表示 Observable 的时间流和事件。这种方法让异步测试变得直观和易于理解。

Marble 语法

基本符号

符号含义
-时间流逝(1帧,约10ms)
a, b, c发出的值
``
#错误
()同步发出
^订阅点(hot Observable)
!取消订阅

示例

javascript
// 基本示例 const source$ = cold('-a-b-c-|'); // 含义:10ms后发出a,20ms后发出b,30ms后发出c,40ms后完成 // 错误示例 const error$ = cold('-a-b-#'); // 含义:10ms后发出a,20ms后发出b,30ms后出错 // 同步示例 const sync$ = cold('(abc|)'); // 含义:同步发出a、b、c,然后完成 // Hot Observable const hot$ = hot('^-a-b-c-|'); // 含义:从订阅点开始,10ms后发出a,20ms后发出b,30ms后发出c,40ms后完成

TestScheduler 的使用

基本设置

javascript
import { TestScheduler } from 'rxjs/testing'; describe('My Observable Tests', () => { let testScheduler: TestScheduler; beforeEach(() => { testScheduler = new TestScheduler((actual, expected) => { expect(actual).toEqual(expected); }); }); });

测试基本操作符

javascript
import { of } from 'rxjs'; import { map, filter } from 'rxjs/operators'; it('should map values', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-|'); const expected = '-A-B-C-|'; const result$ = source$.pipe( map(x => x.toUpperCase()) ); expectObservable(result$).toBe(expected, { a: 'a', b: 'b', c: 'c' }); }); }); it('should filter values', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-d-|'); const expected = '-a-c---|'; const result$ = source$.pipe( filter(x => ['a', 'c'].includes(x)) ); expectObservable(result$).toBe(expected); }); });

测试时间相关操作符

javascript
import { of } from 'rxjs'; import { delay, debounceTime, throttleTime } from 'rxjs/operators'; it('should delay emissions', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-|'); const expected = '---a-b-c-|'; // 延迟30ms const result$ = source$.pipe( delay(30, testScheduler) ); expectObservable(result$).toBe(expected); }); }); it('should debounce', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a--b---c-|'); const expected = '-----b---c-|'; const result$ = source$.pipe( debounceTime(20, testScheduler) ); expectObservable(result$).toBe(expected); }); }); it('should throttle', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-d-|'); const expected = '-a---c---|'; const result$ = source$.pipe( throttleTime(30, testScheduler) ); expectObservable(result$).toBe(expected); }); });

测试组合操作符

javascript
import { of, merge, concat, combineLatest } from 'rxjs'; it('should merge observables', () => { testScheduler.run(({ cold, expectObservable }) => { const source1$ = cold('-a---b-|'); const source2$ = cold('--c-d---|'); const expected = '-a-c-b-d-|'; const result$ = merge(source1$, source2$); expectObservable(result$).toBe(expected); }); }); it('should concatenate observables', () => { testScheduler.run(({ cold, expectObservable }) => { const source1$ = cold('-a-b-|'); const source2$ = cold('--c-d-|'); const expected = '-a-b--c-d-|'; const result$ = concat(source1$, source2$); expectObservable(result$).toBe(expected); }); }); it('should combine latest', () => { testScheduler.run(({ cold, expectObservable }) => { const source1$ = cold('-a---b-|'); const source2$ = cold('--c-d---|'); const expected = '----ab-bd-|'; const result$ = combineLatest([source1$, source2$]); expectObservable(result$).toBe(expected); }); });

测试错误处理

javascript
import { of, throwError } from 'rxjs'; import { catchError, retry } from 'rxjs/operators'; it('should catch errors', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-#'); const expected = '-a-b-(d|)'; const result$ = source$.pipe( catchError(() => of('d')) ); expectObservable(result$).toBe(expected); }); }); it('should retry on error', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-#'); const expected = '-a-a-#'; const result$ = source$.pipe( retry(1) ); expectObservable(result$).toBe(expected); }); });

测试订阅和取消订阅

javascript
import { interval } from 'rxjs'; import { take } from 'rxjs/operators'; it('should handle subscription', () => { testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => { const source$ = cold('-a-b-c-|'); const subs = '^------!'; const result$ = source$.pipe(take(2)); expectObservable(result$).toBe('-a-b-|'); expectSubscriptions(source$.subscriptions).toBe(subs); }); }); it('should handle unsubscription', () => { testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => { const source$ = cold('-a-b-c-d-|'); const subs = '^---!'; const result$ = source$.pipe(take(2)); expectObservable(result$).toBe('-a-b-|'); expectSubscriptions(source$.subscriptions).toBe(subs); }); });

实际应用示例

1. 测试搜索功能

javascript
import { of } from 'rxjs'; import { debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators'; function search(query: string) { return of(`Results for ${query}`); } it('should search with debounce', () => { testScheduler.run(({ cold, expectObservable }) => { const input$ = cold('-a--b---c-|'); const expected = '-----b---c-|'; const result$ = input$.pipe( debounceTime(20, testScheduler), distinctUntilChanged(), switchMap(query => search(query)) ); expectObservable(result$).toBe(expected); }); });

2. 测试自动保存

javascript
import { of } from 'rxjs'; import { debounceTime, switchMap } from 'rxjs/operators'; function save(data: any) { return of('Saved'); } it('should auto-save with debounce', () => { testScheduler.run(({ cold, expectObservable }) => { const changes$ = cold('-a--b---c-|'); const expected = '-----b---c-|'; const result$ = changes$.pipe( debounceTime(20, testScheduler), switchMap(data => save(data)) ); expectObservable(result$).toBe(expected); }); });

3. 测试轮询功能

javascript
import { interval } from 'rxjs'; import { take, map } from 'rxjs/operators'; it('should poll at intervals', () => { testScheduler.run(({ cold, expectObservable }) => { const expected = '-a-b-c-d-e-|'; const result$ = interval(10, testScheduler).pipe( take(5), map(x => String.fromCharCode(97 + x)) ); expectObservable(result$).toBe(expected); }); });

4. 测试缓存功能

javascript
import { of } from 'rxjs'; import { shareReplay } from 'rxjs/operators'; it('should cache values', () => { testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => { const source$ = cold('-a-b-c-|'); const expected = '-a-b-c-|'; const subs = ['^------!', ' ^-!']; const cached$ = source$.pipe(shareReplay(1)); expectObservable(cached$).toBe(expected); expectObservable(cached$).toBe('--c-|'); expectSubscriptions(source$.subscriptions).toBe(subs); }); });

高级用法

1. 测试 Hot Observable

javascript
it('should handle hot observable', () => { testScheduler.run(({ hot, expectObservable }) => { const source$ = hot('--a--b--c--|'); const sub = '---^--------!'; const expected = '--b--c--|'; const result$ = source$.pipe(take(2)); expectObservable(result$, sub).toBe(expected); }); });

2. 测试多播

javascript
import { of } from 'rxjs'; import { share, multicast } from 'rxjs/operators'; import { Subject } from 'rxjs'; it('should multicast correctly', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-|'); const expected = '-a-b-c-|'; const shared$ = source$.pipe(share()); expectObservable(shared$).toBe(expected); expectObservable(shared$).toBe(expected); }); });

3. 测试自定义操作符

javascript
import { Observable } from 'rxjs'; import { OperatorFunction } from 'rxjs'; function customMap<T, R>(project: (value: T) => R): OperatorFunction<T, R> { return (source$) => new Observable(subscriber => { return source$.subscribe({ next: value => { try { subscriber.next(project(value)); } catch (error) { subscriber.error(error); } }, error: error => subscriber.error(error), complete: () => subscriber.complete() }); }); } it('should use custom operator', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-|'); const expected = '-A-B-C-|'; const result$ = source$.pipe( customMap(x => x.toUpperCase()) ); expectObservable(result$).toBe(expected); }); });

最佳实践

1. 使用有意义的值

javascript
// ✅ 好的做法 it('should map values', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-|'); const expected = '-A-B-C-|'; const result$ = source$.pipe(map(x => x.toUpperCase())); expectObservable(result$).toBe(expected, { a: 'a', b: 'b', c: 'c', A: 'A', B: 'B', C: 'C' }); }); }); // ❌ 不好的做法 it('should map values', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-|'); const expected = '-d-e-f-|'; const result$ = source$.pipe(map(x => x.toUpperCase())); expectObservable(result$).toBe(expected); }); });

2. 测试边界情况

javascript
it('should handle empty observable', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('|'); const expected = '|'; const result$ = source$.pipe(map(x => x.toUpperCase())); expectObservable(result$).toBe(expected); }); }); it('should handle error observable', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-#'); const expected = '-#'; const result$ = source$.pipe(map(x => x.toUpperCase())); expectObservable(result$).toBe(expected); }); });

3. 使用 expectSubscriptions

javascript
it('should subscribe and unsubscribe correctly', () => { testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => { const source$ = cold('-a-b-c-|'); const subs = '^------!'; const result$ = source$.pipe(take(3)); expectObservable(result$).toBe('-a-b-c-|'); expectSubscriptions(source$.subscriptions).toBe(subs); }); });

总结

Marble Testing 是 RxJS 中强大的测试工具,它提供了:

  1. 可视化测试: 使用字符串表示时间流,直观易懂
  2. 时间控制: 精确控制异步操作的时序
  3. 易于维护: 清晰的语法和结构
  4. 全面覆盖: 可以测试各种操作符和场景

掌握 Marble Testing 可以显著提升 RxJS 代码的测试质量和开发效率。

标签:Rxjs