服务端阅读 06月5日 00:31
TypeORM事务处理:三种API、隔离级别、悲观锁乐观锁和分布式Saga
TypeORM 提供三种写事务的方式:@Transaction 装饰器(0.3.x 已移除)、EntityManager.transaction、和 QueryRunner。选哪种取决于你的场景——简单事务用 EntityManager.transaction,需要精细控制用 QueryRunner。这篇文章把三种方式、隔离级别、锁机制都讲清楚,以及分布式事务的替代方案。三种事务 API方式一:EntityManager.transaction(推荐日常使用)最常用的事务写法——传一个回调函数,回调里所有操作在同一个事务里:await dataSource.transaction(async (manager) => { // 所有操作必须用 manager,不能用全局 repository const user = await manager.findOne(User, { where: { id: 1 } }); user.balance -= 100; await manager.save(user); const target = await manager.findOne(User, { where: { id: 2 } }); target.balance += 100; await manager.save(target);});关键规则:回调里必须用 manager 参数(EntityManager),不能用 dataSource.getRepository()。全局 Repository 不在事务里,它的操作不受事务保护。回调函数正常返回 → 自动 commit。抛异常 → 自动 rollback。方式二:QueryRunner(需要精细控制时)当你需要手动控制 commit/rollback 时机、设置隔离级别、或者同一个 QueryRunner 上执行多个事务时:const queryRunner = dataSource.createQueryRunner();await queryRunner.connect();await queryRunner.startTransaction();try { await queryRunner.manager.save(User, { name: 'Alice', balance: 100 }); await queryRunner.manager.save(User, { name: 'Bob', balance: 200 }); await queryRunner.commitTransaction();} catch (err) { await queryRunner.rollbackTransaction(); throw err;} finally { await queryRunner.release(); // 必须释放,否则连接泄漏}release() 很关键——QueryRunner 持有数据库连接,不释放连接池会耗尽。finally 块里 release 保证即使 commit/rollback 抛异常也能释放。方式三:NestJS 里的 @TransactionEntityManagerNestJS + TypeORM 项目用 @Transaction() 装饰器(TypeORM 0.3.x 已移除,改用 DataSource.transaction):import { Injectable } from '@nestjs/common';import { InjectDataSource } from '@nestjs/typeorm';import { DataSource } from 'typeorm';@Injectable()export class TransferService { constructor( @InjectDataSource() private dataSource: DataSource, ) {} async transfer(fromId: number, toId: number, amount: number) { return this.dataSource.transaction(async (manager) => { const from = await manager.findOne(User, { where: { id: fromId } }); const to = await manager.findOne(User, { where: { id: toId } }); if (from.balance < amount) { throw new Error('余额不足'); } from.balance -= amount; to.balance += amount; await manager.save([from, to]); }); }}隔离级别隔离级别决定事务之间能看到彼此的修改到什么程度。| 隔离级别 | 脏读 | 不可重复读 | 幻读 | 性能 | 适用场景 ||----------|------|-----------|------|------|----------|| READ UNCOMMITTED | ✅ 可能 | ✅ 可能 | ✅ 可能 | 最快 | 几乎不用 || READ COMMITTED | ❌ 不会 | ✅ 可能 | ✅ 可能 | 快 | PostgreSQL 默认 || REPEATABLE READ | ❌ 不会 | ❌ 不会 | ✅ 可能 | 中 | MySQL 默认 || SERIALIZABLE | ❌ 不会 | ❌ 不会 | ❌ 不会 | 最慢 | 金融交易 |设置隔离级别// QueryRunner 方式await queryRunner.startTransaction('SERIALIZABLE');// DataSource.transaction 不支持直接设隔离级别// 需要先用 QueryRunner 设const queryRunner = dataSource.createQueryRunner();await queryRunner.connect();await queryRunner.startTransaction('REPEATABLE READ');实际场景READ COMMITTED:大多数 Web 应用。你的 API 请求处理用这个级别够了——一个请求不会因为另一个请求的未提交数据而出错。REPEATABLE READ:同一个事务里多次读同一行数据,结果必须一致。比如生成报表时,事务期间数据不能变。SERIALIZABLE:转账、库存扣减等绝对不能出错的场景。性能代价大,只在关键业务上用。锁机制悲观锁:数据库层面加锁查询时直接锁行,其他事务不能修改:// 排他锁(FOR UPDATE):其他事务不能读也不能写const user = await manager.findOne(User, { where: { id: 1 }, lock: { mode: 'pessimistic_write' },});// 共享锁(FOR SHARE):其他事务可读不可写const user = await manager.findOne(User, { where: { id: 1 }, lock: { mode: 'pessimistic_read' },});使用场景:扣库存、转账——先锁住行,再修改,防止两个事务同时读到同一个余额然后覆盖。await dataSource.transaction(async (manager) => { // 先锁行 const account = await manager.findOne(Account, { where: { id: 1 }, lock: { mode: 'pessimistic_write' }, }); // 再修改 account.balance -= amount; await manager.save(account);});不加锁的并发问题:事务 A 和事务 B 同时读到余额 1000,各扣 100,都写入 900——应该是 800。加 pessimistic_write 后,事务 B 等 A commit 后才能读,拿到 900 再扣,最终 800。乐观锁:应用层面检查不锁行,更新时检查数据是否被改过。实体加 @Version() 列:@Entity()export class User { @PrimaryGeneratedColumn() id: number; @Column() name: string; @VersionColumn() version: number; // 每次更新自动 +1}const user = await manager.findOne(User, { where: { id: 1 } });// user.version = 1user.name = 'Alice';await manager.save(user);// SQL: UPDATE user SET name = 'Alice', version = 2 WHERE id = 1 AND version = 1// 如果另一个事务已经改了这行(version 变成 2),WHERE 条件不匹配,影响行数为 0// TypeORM 抛出 OptimisticLockVersionMismatchError乐观锁不锁行,不阻塞读,适合读多写少的场景。缺点是冲突时需要重试。乐观锁重试模式async function updateWithRetry(id: number, updateFn: (user: User) => void, retries = 3) { for (let i = 0; i < retries; i++) { const user = await dataSource.getRepository(User).findOne({ where: { id } }); updateFn(user); try { await dataSource.getRepository(User).save(user); return; } catch (err) { if (err.name === 'OptimisticLockVersionMismatchError' && i < retries - 1) { continue; // 重试 } throw err; } }}死锁两个事务互相等对方释放锁,永远卡住。TypeORM 不会自动检测死锁——依赖数据库的死锁检测机制。MySQL 和 PostgreSQL 会自动检测并回滚其中一个事务。避免死锁的原则:事务尽量短——快进快出,不持有锁太久按固定顺序访问资源——总是先锁 A 再锁 B,不要一个先 A 后 B,另一个先 B 后 A不要在事务里做网络请求——网络慢会长时间持有锁分布式事务TypeORM 没有内置分布式事务支持。如果两个操作分别在不同数据库或不同服务上,不能用 TypeORM 的事务保证原子性。替代方案:Saga 模式把分布式事务拆成一系列本地事务,每步成功后触发下一步,失败时执行补偿操作:async function createOrderSaga(orderData: OrderData) { // 步骤1:创建订单(本地事务) const order = await orderService.createOrder(orderData); try { // 步骤2:扣库存(远程调用) await inventoryService.deduct(orderData.items); try { // 步骤3:扣款(远程调用) await paymentService.charge(orderData.amount); } catch (err) { // 步骤3失败:补偿步骤2(还库存) await inventoryService.restore(orderData.items); throw err; } } catch (err) { // 步骤2失败:补偿步骤1(取消订单) await orderService.cancelOrder(order.id); throw err; }}Saga 不保证强一致性——中间状态对外可见(订单创建了但库存还没扣)。但这是分布式系统里唯一的实用方案——两阶段提交(2PC)性能太差,跨服务基本不用。