乐闻世界logo
搜索文章和话题

TypeORM 如何处理事务?包括事务隔离级别、锁机制和分布式事务的详细说明

2月18日 22:21

事务是数据库操作的核心概念,它确保一组数据库操作要么全部成功,要么全部失败。TypeORM 提供了多种事务处理方式,让开发者能够轻松管理数据库事务。

事务基础概念

什么是事务

事务是数据库操作的逻辑单元,具有 ACID 特性:

  • 原子性 (Atomicity): 事务中的操作要么全部执行,要么全部不执行
  • 一致性 (Consistency): 事务执行前后数据库保持一致状态
  • 隔离性 (Isolation): 并发事务之间相互隔离
  • 持久性 (Durability): 事务提交后,修改永久保存

事务的使用场景

  • 银行转账(从一个账户扣款,另一个账户入账)
  • 订单处理(创建订单、扣减库存、更新用户余额)
  • 多表关联操作
  • 需要数据一致性的复杂业务逻辑

TypeORM 事务处理方式

1. 使用 DataSource.transaction()

这是最常用的事务处理方式,自动管理事务的提交和回滚。

typescript
import { DataSource } from 'typeorm'; async function transferFunds( dataSource: DataSource, fromUserId: number, toUserId: number, amount: number ) { await dataSource.transaction(async transactionalEntityManager => { // 查询转出账户 const fromUser = await transactionalEntityManager.findOne(User, { where: { id: fromUserId }, lock: { mode: 'pessimistic_write' } }); if (!fromUser || fromUser.balance < amount) { throw new Error('Insufficient balance'); } // 查询转入账户 const toUser = await transactionalEntityManager.findOne(User, { where: { id: toUserId }, lock: { mode: 'pessimistic_write' } }); if (!toUser) { throw new Error('Recipient not found'); } // 扣减转出账户余额 fromUser.balance -= amount; await transactionalEntityManager.save(fromUser); // 增加转入账户余额 toUser.balance += amount; await transactionalEntityManager.save(toUser); // 记录交易日志 const transaction = transactionalEntityManager.create(Transaction, { fromUserId, toUserId, amount, type: 'transfer' }); await transactionalEntityManager.save(transaction); }); }

2. 使用 QueryRunner

QueryRunner 提供了更细粒度的事务控制。

typescript
import { DataSource } from 'typeorm'; async function transferWithQueryRunner( dataSource: DataSource, fromUserId: number, toUserId: number, amount: number ) { const queryRunner = dataSource.createQueryRunner(); try { // 开启事务 await queryRunner.connect(); await queryRunner.startTransaction(); // 查询转出账户 const fromUser = await queryRunner.manager.findOne(User, { where: { id: fromUserId }, lock: { mode: 'pessimistic_write' } }); if (!fromUser || fromUser.balance < amount) { throw new Error('Insufficient balance'); } // 查询转入账户 const toUser = await queryRunner.manager.findOne(User, { where: { id: toUserId }, lock: { mode: 'pessimistic_write' } }); if (!toUser) { throw new Error('Recipient not found'); } // 执行转账操作 await queryRunner.manager.update(User, fromUserId, { balance: fromUser.balance - amount }); await queryRunner.manager.update(User, toUserId, { balance: toUser.balance + amount }); // 记录交易日志 await queryRunner.manager.insert(Transaction, { fromUserId, toUserId, amount, type: 'transfer' }); // 提交事务 await queryRunner.commitTransaction(); } catch (error) { // 回滚事务 await queryRunner.rollbackTransaction(); throw error; } finally { // 释放 QueryRunner await queryRunner.release(); } }

3. 使用装饰器 Transaction

在类方法上使用装饰器声明事务。

typescript
import { Transaction, TransactionManager, EntityManager } from 'typeorm'; class PaymentService { @Transaction() async processPayment( userId: number, amount: number, @TransactionManager() manager?: EntityManager ) { const user = await manager.findOne(User, { where: { id: userId }, lock: { mode: 'pessimistic_write' } }); if (!user || user.balance < amount) { throw new Error('Insufficient balance'); } user.balance -= amount; await manager.save(user); const payment = manager.create(Payment, { userId, amount, status: 'completed' }); await manager.save(payment); } }

事务隔离级别

设置隔离级别

typescript
import { DataSource } from 'typeorm'; const dataSource = new DataSource({ type: 'mysql', host: 'localhost', port: 3306, username: 'root', password: 'password', database: 'myapp', entities: [User, Transaction], // 设置默认隔离级别 extra: { connectionLimit: 10, }, }); // 在事务中设置隔离级别 await dataSource.transaction(async transactionalEntityManager => { const queryRunner = transactionalEntityManager.queryRunner; // 设置隔离级别 await queryRunner.query('SET TRANSACTION ISOLATION LEVEL READ COMMITTED'); // 执行业务逻辑 // ... });

隔离级别说明

  1. READ UNCOMMITTED (读未提交)

    • 可以读取未提交的数据
    • 可能出现脏读、不可重复读、幻读
    • 性能最好,但数据一致性最差
  2. READ COMMITTED (读已提交)

    • 只能读取已提交的数据
    • 避免脏读,但可能出现不可重复读、幻读
    • 大多数数据库的默认隔离级别
  3. REPEATABLE READ (可重复读)

    • 在同一事务中多次读取同一数据结果一致
    • 避免脏读、不可重复读,但可能出现幻读
    • MySQL 的默认隔离级别
  4. SERIALIZABLE (串行化)

    • 最高隔离级别,完全避免并发问题
    • 性能最差,但数据一致性最好
typescript
// 设置不同隔离级别的示例 await dataSource.transaction(async transactionalEntityManager => { const queryRunner = transactionalEntityManager.queryRunner; // READ COMMITTED await queryRunner.query('SET TRANSACTION ISOLATION LEVEL READ COMMITTED'); // REPEATABLE READ await queryRunner.query('SET TRANSACTION ISOLATION LEVEL REPEATABLE READ'); // SERIALIZABLE await queryRunner.query('SET TRANSACTION ISOLATION LEVEL SERIALIZABLE'); });

锁机制

悲观锁

悲观锁假设会发生并发冲突,在读取数据时就锁定。

typescript
// SELECT ... FOR UPDATE const user = await dataSource.transaction(async transactionalEntityManager => { const user = await transactionalEntityManager.findOne(User, { where: { id: userId }, lock: { mode: 'pessimistic_write' } }); // 此时该行被锁定,其他事务无法修改 user.balance -= amount; await transactionalEntityManager.save(user); return user; }); // SELECT ... FOR SHARE const user = await dataSource.transaction(async transactionalEntityManager => { const user = await transactionalEntityManager.findOne(User, { where: { id: userId }, lock: { mode: 'pessimistic_read' } }); // 其他事务可以读取但不能修改 return user; });

乐观锁

乐观锁假设不会发生并发冲突,通过版本号或时间戳检测冲突。

typescript
@Entity() export class Product { @PrimaryGeneratedColumn() id: number; @Column() name: string; @Column() stock: number; @VersionColumn() version: number; } // 使用乐观锁 async function updateProductStock(productId: number, quantity: number) { const product = await dataSource.manager.findOne(Product, { where: { id: productId } }); if (!product || product.stock < quantity) { throw new Error('Insufficient stock'); } product.stock -= quantity; try { await dataSource.manager.save(product); } catch (error) { // 版本冲突,说明其他事务已经修改了数据 throw new Error('Concurrent modification detected'); } }

嵌套事务

使用 Savepoint

typescript
async function complexOperation(dataSource: DataSource) { await dataSource.transaction(async transactionalEntityManager => { const queryRunner = transactionalEntityManager.queryRunner; try { // 主事务操作 await transactionalEntityManager.save(/* ... */); // 创建保存点 await queryRunner.query('SAVEPOINT savepoint1'); try { // 子事务操作 await transactionalEntityManager.save(/* ... */); // 如果成功,释放保存点 await queryRunner.query('RELEASE SAVEPOINT savepoint1'); } catch (error) { // 如果失败,回滚到保存点 await queryRunner.query('ROLLBACK TO SAVEPOINT savepoint1'); // 继续执行其他操作 } // 继续主事务操作 await transactionalEntityManager.save(/* ... */); } catch (error) { throw error; // 整个事务回滚 } }); }

事务超时和重试

设置事务超时

typescript
async function transactionWithTimeout( dataSource: DataSource, timeout: number = 30000 ) { const queryRunner = dataSource.createQueryRunner(); try { await queryRunner.connect(); await queryRunner.startTransaction(); // 设置事务超时(MySQL) await queryRunner.query(`SET max_execution_time = ${timeout}`); // 执行业务逻辑 // ... await queryRunner.commitTransaction(); } catch (error) { await queryRunner.rollbackTransaction(); throw error; } finally { await queryRunner.release(); } }

事务重试机制

typescript
async function transactionWithRetry( dataSource: DataSource, maxRetries: number = 3, operation: (manager: EntityManager) => Promise<void> ) { let lastError: Error; for (let attempt = 1; attempt <= maxRetries; attempt++) { try { await dataSource.transaction(async transactionalEntityManager => { await operation(transactionalEntityManager); }); return; // 成功,退出重试循环 } catch (error) { lastError = error; // 如果是死锁错误,可以重试 if (error.code === 'ER_LOCK_DEADLOCK' && attempt < maxRetries) { await new Promise(resolve => setTimeout(resolve, 100 * attempt)); continue; } throw error; // 其他错误直接抛出 } } throw lastError; } // 使用示例 await transactionWithRetry(dataSource, 3, async manager => { const user = await manager.findOne(User, { where: { id: 1 } }); user.balance += 100; await manager.save(user); });

分布式事务

两阶段提交 (2PC)

TypeORM 本身不支持分布式事务,但可以通过以下方式实现:

typescript
async function distributedTransaction( dataSource1: DataSource, dataSource2: DataSource ) { const queryRunner1 = dataSource1.createQueryRunner(); const queryRunner2 = dataSource2.createQueryRunner(); try { await queryRunner1.connect(); await queryRunner2.connect(); await queryRunner1.startTransaction(); await queryRunner2.startTransaction(); // 第一阶段:准备 await queryRunner1.manager.save(/* ... */); await queryRunner2.manager.save(/* ... */); // 第二阶段:提交 await queryRunner1.commitTransaction(); await queryRunner2.commitTransaction(); } catch (error) { // 回滚所有事务 await queryRunner1.rollbackTransaction(); await queryRunner2.rollbackTransaction(); throw error; } finally { await queryRunner1.release(); await queryRunner2.release(); } }

最佳实践

1. 保持事务简短

typescript
// ❌ 不好的做法:事务时间过长 await dataSource.transaction(async manager => { const user = await manager.findOne(User, { where: { id: 1 } }); // 执行耗时操作(不应该在事务中) await sendEmail(user.email); await processPayment(user); user.balance += 100; await manager.save(user); }); // ✅ 好的做法:只包含数据库操作 const user = await dataSource.manager.findOne(User, { where: { id: 1 } }); // 在事务外执行耗时操作 await sendEmail(user.email); await processPayment(user); // 在事务中只执行数据库操作 await dataSource.transaction(async manager => { user.balance += 100; await manager.save(user); });

2. 正确处理异常

typescript
await dataSource.transaction(async manager => { try { const user = await manager.findOne(User, { where: { id: 1 } }); if (!user) { throw new Error('User not found'); } user.balance += 100; await manager.save(user); } catch (error) { // 记录错误日志 console.error('Transaction failed:', error); // 抛出错误以触发回滚 throw error; } });

3. 避免嵌套事务

typescript
// ❌ 不好的做法:嵌套事务 await dataSource.transaction(async manager1 => { await dataSource.transaction(async manager2 => { // 嵌套事务可能导致问题 }); }); // ✅ 好的做法:使用单一事务 await dataSource.transaction(async manager => { // 所有操作在一个事务中 });

4. 使用适当的隔离级别

typescript
// 根据业务需求选择合适的隔离级别 await dataSource.transaction(async manager => { const queryRunner = manager.queryRunner; // 读多写少的场景使用 READ COMMITTED await queryRunner.query('SET TRANSACTION ISOLATION LEVEL READ COMMITTED'); // 需要强一致性的场景使用 SERIALIZABLE // await queryRunner.query('SET TRANSACTION ISOLATION LEVEL SERIALIZABLE'); // 执行业务逻辑 });

TypeORM 的事务处理功能强大而灵活,掌握事务的使用对于构建可靠的数据驱动应用至关重要。正确使用事务可以确保数据的一致性和完整性,避免并发问题。

标签:TypeORM