TypeORM事务处理:三种API、隔离级别、悲观锁乐观锁和分布式Saga
TypeORM 提供三种写事务的方式:@Transaction 装饰器(0.3.x 已移除)、EntityManager.transaction、和 QueryRunner。选哪种取决于你的场景——简单事务用 EntityManager.transaction,需要精细控制用 QueryRunner。这篇文章把三种方式、隔离级别、锁机制都讲清楚,以及分布式事务的替代方案。
三种事务 API
方式一:EntityManager.transaction(推荐日常使用)
最常用的事务写法——传一个回调函数,回调里所有操作在同一个事务里:
typescriptawait dataSource.transaction(async (manager) => { // 所有操作必须用 manager,不能用全局 repository const user = await manager.findOne(User, { where: { id: 1 } }); user.balance -= 100; await manager.save(user); const target = await manager.findOne(User, { where: { id: 2 } }); target.balance += 100; await manager.save(target); });
关键规则:回调里必须用 manager 参数(EntityManager),不能用 dataSource.getRepository()。全局 Repository 不在事务里,它的操作不受事务保护。
回调函数正常返回 → 自动 commit。抛异常 → 自动 rollback。
方式二:QueryRunner(需要精细控制时)
当你需要手动控制 commit/rollback 时机、设置隔离级别、或者同一个 QueryRunner 上执行多个事务时:
typescriptconst queryRunner = dataSource.createQueryRunner(); await queryRunner.connect(); await queryRunner.startTransaction(); try { await queryRunner.manager.save(User, { name: 'Alice', balance: 100 }); await queryRunner.manager.save(User, { name: 'Bob', balance: 200 }); await queryRunner.commitTransaction(); } catch (err) { await queryRunner.rollbackTransaction(); throw err; } finally { await queryRunner.release(); // 必须释放,否则连接泄漏 }
release() 很关键——QueryRunner 持有数据库连接,不释放连接池会耗尽。finally 块里 release 保证即使 commit/rollback 抛异常也能释放。
方式三:NestJS 里的 @TransactionEntityManager
NestJS + TypeORM 项目用 @Transaction() 装饰器(TypeORM 0.3.x 已移除,改用 DataSource.transaction):
typescriptimport { Injectable } from '@nestjs/common'; import { InjectDataSource } from '@nestjs/typeorm'; import { DataSource } from 'typeorm'; @Injectable() export class TransferService { constructor( @InjectDataSource() private dataSource: DataSource, ) {} async transfer(fromId: number, toId: number, amount: number) { return this.dataSource.transaction(async (manager) => { const from = await manager.findOne(User, { where: { id: fromId } }); const to = await manager.findOne(User, { where: { id: toId } }); if (from.balance < amount) { throw new Error('余额不足'); } from.balance -= amount; to.balance += amount; await manager.save([from, to]); }); } }
隔离级别
隔离级别决定事务之间能看到彼此的修改到什么程度。
| 隔离级别 | 脏读 | 不可重复读 | 幻读 | 性能 | 适用场景 |
|---|---|---|---|---|---|
| READ UNCOMMITTED | ✅ 可能 | ✅ 可能 | ✅ 可能 | 最快 | 几乎不用 |
| READ COMMITTED | ❌ 不会 | ✅ 可能 | ✅ 可能 | 快 | PostgreSQL 默认 |
| REPEATABLE READ | ❌ 不会 | ❌ 不会 | ✅ 可能 | 中 | MySQL 默认 |
| SERIALIZABLE | ❌ 不会 | ❌ 不会 | ❌ 不会 | 最慢 | 金融交易 |
设置隔离级别
typescript// QueryRunner 方式 await queryRunner.startTransaction('SERIALIZABLE'); // DataSource.transaction 不支持直接设隔离级别 // 需要先用 QueryRunner 设 const queryRunner = dataSource.createQueryRunner(); await queryRunner.connect(); await queryRunner.startTransaction('REPEATABLE READ');
实际场景
READ COMMITTED:大多数 Web 应用。你的 API 请求处理用这个级别够了——一个请求不会因为另一个请求的未提交数据而出错。
REPEATABLE READ:同一个事务里多次读同一行数据,结果必须一致。比如生成报表时,事务期间数据不能变。
SERIALIZABLE:转账、库存扣减等绝对不能出错的场景。性能代价大,只在关键业务上用。
锁机制
悲观锁:数据库层面加锁
查询时直接锁行,其他事务不能修改:
typescript// 排他锁(FOR UPDATE):其他事务不能读也不能写 const user = await manager.findOne(User, { where: { id: 1 }, lock: { mode: 'pessimistic_write' }, }); // 共享锁(FOR SHARE):其他事务可读不可写 const user = await manager.findOne(User, { where: { id: 1 }, lock: { mode: 'pessimistic_read' }, });
使用场景:扣库存、转账——先锁住行,再修改,防止两个事务同时读到同一个余额然后覆盖。
typescriptawait dataSource.transaction(async (manager) => { // 先锁行 const account = await manager.findOne(Account, { where: { id: 1 }, lock: { mode: 'pessimistic_write' }, }); // 再修改 account.balance -= amount; await manager.save(account); });
不加锁的并发问题:事务 A 和事务 B 同时读到余额 1000,各扣 100,都写入 900——应该是 800。加 pessimistic_write 后,事务 B 等 A commit 后才能读,拿到 900 再扣,最终 800。
乐观锁:应用层面检查
不锁行,更新时检查数据是否被改过。实体加 @Version() 列:
typescript@Entity() export class User { @PrimaryGeneratedColumn() id: number; @Column() name: string; @VersionColumn() version: number; // 每次更新自动 +1 }
typescriptconst user = await manager.findOne(User, { where: { id: 1 } }); // user.version = 1 user.name = 'Alice'; await manager.save(user); // SQL: UPDATE user SET name = 'Alice', version = 2 WHERE id = 1 AND version = 1 // 如果另一个事务已经改了这行(version 变成 2),WHERE 条件不匹配,影响行数为 0 // TypeORM 抛出 OptimisticLockVersionMismatchError
乐观锁不锁行,不阻塞读,适合读多写少的场景。缺点是冲突时需要重试。
乐观锁重试模式
typescriptasync function updateWithRetry(id: number, updateFn: (user: User) => void, retries = 3) { for (let i = 0; i < retries; i++) { const user = await dataSource.getRepository(User).findOne({ where: { id } }); updateFn(user); try { await dataSource.getRepository(User).save(user); return; } catch (err) { if (err.name === 'OptimisticLockVersionMismatchError' && i < retries - 1) { continue; // 重试 } throw err; } } }
死锁
两个事务互相等对方释放锁,永远卡住。TypeORM 不会自动检测死锁——依赖数据库的死锁检测机制。MySQL 和 PostgreSQL 会自动检测并回滚其中一个事务。
避免死锁的原则:
- 事务尽量短——快进快出,不持有锁太久
- 按固定顺序访问资源——总是先锁 A 再锁 B,不要一个先 A 后 B,另一个先 B 后 A
- 不要在事务里做网络请求——网络慢会长时间持有锁
分布式事务
TypeORM 没有内置分布式事务支持。如果两个操作分别在不同数据库或不同服务上,不能用 TypeORM 的事务保证原子性。
替代方案:Saga 模式
把分布式事务拆成一系列本地事务,每步成功后触发下一步,失败时执行补偿操作:
typescriptasync function createOrderSaga(orderData: OrderData) { // 步骤1:创建订单(本地事务) const order = await orderService.createOrder(orderData); try { // 步骤2:扣库存(远程调用) await inventoryService.deduct(orderData.items); try { // 步骤3:扣款(远程调用) await paymentService.charge(orderData.amount); } catch (err) { // 步骤3失败:补偿步骤2(还库存) await inventoryService.restore(orderData.items); throw err; } } catch (err) { // 步骤2失败:补偿步骤1(取消订单) await orderService.cancelOrder(order.id); throw err; } }
Saga 不保证强一致性——中间状态对外可见(订单创建了但库存还没扣)。但这是分布式系统里唯一的实用方案——两阶段提交(2PC)性能太差,跨服务基本不用。