6月5日 00:31

TypeORM事务处理:三种API、隔离级别、悲观锁乐观锁和分布式Saga

TypeORM 提供三种写事务的方式:@Transaction 装饰器(0.3.x 已移除)、EntityManager.transaction、和 QueryRunner。选哪种取决于你的场景——简单事务用 EntityManager.transaction,需要精细控制用 QueryRunner。这篇文章把三种方式、隔离级别、锁机制都讲清楚,以及分布式事务的替代方案。

三种事务 API

方式一:EntityManager.transaction(推荐日常使用)

最常用的事务写法——传一个回调函数,回调里所有操作在同一个事务里:

typescript
await dataSource.transaction(async (manager) => { // 所有操作必须用 manager,不能用全局 repository const user = await manager.findOne(User, { where: { id: 1 } }); user.balance -= 100; await manager.save(user); const target = await manager.findOne(User, { where: { id: 2 } }); target.balance += 100; await manager.save(target); });

关键规则:回调里必须用 manager 参数(EntityManager),不能用 dataSource.getRepository()。全局 Repository 不在事务里,它的操作不受事务保护。

回调函数正常返回 → 自动 commit。抛异常 → 自动 rollback。

方式二:QueryRunner(需要精细控制时)

当你需要手动控制 commit/rollback 时机、设置隔离级别、或者同一个 QueryRunner 上执行多个事务时:

typescript
const queryRunner = dataSource.createQueryRunner(); await queryRunner.connect(); await queryRunner.startTransaction(); try { await queryRunner.manager.save(User, { name: 'Alice', balance: 100 }); await queryRunner.manager.save(User, { name: 'Bob', balance: 200 }); await queryRunner.commitTransaction(); } catch (err) { await queryRunner.rollbackTransaction(); throw err; } finally { await queryRunner.release(); // 必须释放,否则连接泄漏 }

release() 很关键——QueryRunner 持有数据库连接,不释放连接池会耗尽。finally 块里 release 保证即使 commit/rollback 抛异常也能释放。

方式三:NestJS 里的 @TransactionEntityManager

NestJS + TypeORM 项目用 @Transaction() 装饰器(TypeORM 0.3.x 已移除,改用 DataSource.transaction):

typescript
import { Injectable } from '@nestjs/common'; import { InjectDataSource } from '@nestjs/typeorm'; import { DataSource } from 'typeorm'; @Injectable() export class TransferService { constructor( @InjectDataSource() private dataSource: DataSource, ) {} async transfer(fromId: number, toId: number, amount: number) { return this.dataSource.transaction(async (manager) => { const from = await manager.findOne(User, { where: { id: fromId } }); const to = await manager.findOne(User, { where: { id: toId } }); if (from.balance < amount) { throw new Error('余额不足'); } from.balance -= amount; to.balance += amount; await manager.save([from, to]); }); } }

隔离级别

隔离级别决定事务之间能看到彼此的修改到什么程度。

隔离级别脏读不可重复读幻读性能适用场景
READ UNCOMMITTED✅ 可能✅ 可能✅ 可能最快几乎不用
READ COMMITTED❌ 不会✅ 可能✅ 可能PostgreSQL 默认
REPEATABLE READ❌ 不会❌ 不会✅ 可能MySQL 默认
SERIALIZABLE❌ 不会❌ 不会❌ 不会最慢金融交易

设置隔离级别

typescript
// QueryRunner 方式 await queryRunner.startTransaction('SERIALIZABLE'); // DataSource.transaction 不支持直接设隔离级别 // 需要先用 QueryRunner 设 const queryRunner = dataSource.createQueryRunner(); await queryRunner.connect(); await queryRunner.startTransaction('REPEATABLE READ');

实际场景

READ COMMITTED:大多数 Web 应用。你的 API 请求处理用这个级别够了——一个请求不会因为另一个请求的未提交数据而出错。

REPEATABLE READ:同一个事务里多次读同一行数据,结果必须一致。比如生成报表时,事务期间数据不能变。

SERIALIZABLE:转账、库存扣减等绝对不能出错的场景。性能代价大,只在关键业务上用。

锁机制

悲观锁:数据库层面加锁

查询时直接锁行,其他事务不能修改:

typescript
// 排他锁(FOR UPDATE):其他事务不能读也不能写 const user = await manager.findOne(User, { where: { id: 1 }, lock: { mode: 'pessimistic_write' }, }); // 共享锁(FOR SHARE):其他事务可读不可写 const user = await manager.findOne(User, { where: { id: 1 }, lock: { mode: 'pessimistic_read' }, });

使用场景:扣库存、转账——先锁住行,再修改,防止两个事务同时读到同一个余额然后覆盖。

typescript
await dataSource.transaction(async (manager) => { // 先锁行 const account = await manager.findOne(Account, { where: { id: 1 }, lock: { mode: 'pessimistic_write' }, }); // 再修改 account.balance -= amount; await manager.save(account); });

不加锁的并发问题:事务 A 和事务 B 同时读到余额 1000,各扣 100,都写入 900——应该是 800。加 pessimistic_write 后,事务 B 等 A commit 后才能读,拿到 900 再扣,最终 800。

乐观锁:应用层面检查

不锁行,更新时检查数据是否被改过。实体加 @Version() 列:

typescript
@Entity() export class User { @PrimaryGeneratedColumn() id: number; @Column() name: string; @VersionColumn() version: number; // 每次更新自动 +1 }
typescript
const user = await manager.findOne(User, { where: { id: 1 } }); // user.version = 1 user.name = 'Alice'; await manager.save(user); // SQL: UPDATE user SET name = 'Alice', version = 2 WHERE id = 1 AND version = 1 // 如果另一个事务已经改了这行(version 变成 2),WHERE 条件不匹配,影响行数为 0 // TypeORM 抛出 OptimisticLockVersionMismatchError

乐观锁不锁行,不阻塞读,适合读多写少的场景。缺点是冲突时需要重试。

乐观锁重试模式

typescript
async function updateWithRetry(id: number, updateFn: (user: User) => void, retries = 3) { for (let i = 0; i < retries; i++) { const user = await dataSource.getRepository(User).findOne({ where: { id } }); updateFn(user); try { await dataSource.getRepository(User).save(user); return; } catch (err) { if (err.name === 'OptimisticLockVersionMismatchError' && i < retries - 1) { continue; // 重试 } throw err; } } }

死锁

两个事务互相等对方释放锁,永远卡住。TypeORM 不会自动检测死锁——依赖数据库的死锁检测机制。MySQL 和 PostgreSQL 会自动检测并回滚其中一个事务。

避免死锁的原则:

  • 事务尽量短——快进快出,不持有锁太久
  • 按固定顺序访问资源——总是先锁 A 再锁 B,不要一个先 A 后 B,另一个先 B 后 A
  • 不要在事务里做网络请求——网络慢会长时间持有锁

分布式事务

TypeORM 没有内置分布式事务支持。如果两个操作分别在不同数据库或不同服务上,不能用 TypeORM 的事务保证原子性。

替代方案:Saga 模式

把分布式事务拆成一系列本地事务,每步成功后触发下一步,失败时执行补偿操作:

typescript
async function createOrderSaga(orderData: OrderData) { // 步骤1:创建订单(本地事务) const order = await orderService.createOrder(orderData); try { // 步骤2:扣库存(远程调用) await inventoryService.deduct(orderData.items); try { // 步骤3:扣款(远程调用) await paymentService.charge(orderData.amount); } catch (err) { // 步骤3失败:补偿步骤2(还库存) await inventoryService.restore(orderData.items); throw err; } } catch (err) { // 步骤2失败:补偿步骤1(取消订单) await orderService.cancelOrder(order.id); throw err; } }

Saga 不保证强一致性——中间状态对外可见(订单创建了但库存还没扣)。但这是分布式系统里唯一的实用方案——两阶段提交(2PC)性能太差,跨服务基本不用。

标签:TypeORM