事务是数据库操作的核心概念,它确保一组数据库操作要么全部成功,要么全部失败。TypeORM 提供了多种事务处理方式,让开发者能够轻松管理数据库事务。
事务基础概念
什么是事务
事务是数据库操作的逻辑单元,具有 ACID 特性:
- 原子性 (Atomicity): 事务中的操作要么全部执行,要么全部不执行
- 一致性 (Consistency): 事务执行前后数据库保持一致状态
- 隔离性 (Isolation): 并发事务之间相互隔离
- 持久性 (Durability): 事务提交后,修改永久保存
事务的使用场景
- 银行转账(从一个账户扣款,另一个账户入账)
- 订单处理(创建订单、扣减库存、更新用户余额)
- 多表关联操作
- 需要数据一致性的复杂业务逻辑
TypeORM 事务处理方式
1. 使用 DataSource.transaction()
这是最常用的事务处理方式,自动管理事务的提交和回滚。
typescriptimport { DataSource } from 'typeorm'; async function transferFunds( dataSource: DataSource, fromUserId: number, toUserId: number, amount: number ) { await dataSource.transaction(async transactionalEntityManager => { // 查询转出账户 const fromUser = await transactionalEntityManager.findOne(User, { where: { id: fromUserId }, lock: { mode: 'pessimistic_write' } }); if (!fromUser || fromUser.balance < amount) { throw new Error('Insufficient balance'); } // 查询转入账户 const toUser = await transactionalEntityManager.findOne(User, { where: { id: toUserId }, lock: { mode: 'pessimistic_write' } }); if (!toUser) { throw new Error('Recipient not found'); } // 扣减转出账户余额 fromUser.balance -= amount; await transactionalEntityManager.save(fromUser); // 增加转入账户余额 toUser.balance += amount; await transactionalEntityManager.save(toUser); // 记录交易日志 const transaction = transactionalEntityManager.create(Transaction, { fromUserId, toUserId, amount, type: 'transfer' }); await transactionalEntityManager.save(transaction); }); }
2. 使用 QueryRunner
QueryRunner 提供了更细粒度的事务控制。
typescriptimport { DataSource } from 'typeorm'; async function transferWithQueryRunner( dataSource: DataSource, fromUserId: number, toUserId: number, amount: number ) { const queryRunner = dataSource.createQueryRunner(); try { // 开启事务 await queryRunner.connect(); await queryRunner.startTransaction(); // 查询转出账户 const fromUser = await queryRunner.manager.findOne(User, { where: { id: fromUserId }, lock: { mode: 'pessimistic_write' } }); if (!fromUser || fromUser.balance < amount) { throw new Error('Insufficient balance'); } // 查询转入账户 const toUser = await queryRunner.manager.findOne(User, { where: { id: toUserId }, lock: { mode: 'pessimistic_write' } }); if (!toUser) { throw new Error('Recipient not found'); } // 执行转账操作 await queryRunner.manager.update(User, fromUserId, { balance: fromUser.balance - amount }); await queryRunner.manager.update(User, toUserId, { balance: toUser.balance + amount }); // 记录交易日志 await queryRunner.manager.insert(Transaction, { fromUserId, toUserId, amount, type: 'transfer' }); // 提交事务 await queryRunner.commitTransaction(); } catch (error) { // 回滚事务 await queryRunner.rollbackTransaction(); throw error; } finally { // 释放 QueryRunner await queryRunner.release(); } }
3. 使用装饰器 Transaction
在类方法上使用装饰器声明事务。
typescriptimport { Transaction, TransactionManager, EntityManager } from 'typeorm'; class PaymentService { @Transaction() async processPayment( userId: number, amount: number, @TransactionManager() manager?: EntityManager ) { const user = await manager.findOne(User, { where: { id: userId }, lock: { mode: 'pessimistic_write' } }); if (!user || user.balance < amount) { throw new Error('Insufficient balance'); } user.balance -= amount; await manager.save(user); const payment = manager.create(Payment, { userId, amount, status: 'completed' }); await manager.save(payment); } }
事务隔离级别
设置隔离级别
typescriptimport { DataSource } from 'typeorm'; const dataSource = new DataSource({ type: 'mysql', host: 'localhost', port: 3306, username: 'root', password: 'password', database: 'myapp', entities: [User, Transaction], // 设置默认隔离级别 extra: { connectionLimit: 10, }, }); // 在事务中设置隔离级别 await dataSource.transaction(async transactionalEntityManager => { const queryRunner = transactionalEntityManager.queryRunner; // 设置隔离级别 await queryRunner.query('SET TRANSACTION ISOLATION LEVEL READ COMMITTED'); // 执行业务逻辑 // ... });
隔离级别说明
-
READ UNCOMMITTED (读未提交)
- 可以读取未提交的数据
- 可能出现脏读、不可重复读、幻读
- 性能最好,但数据一致性最差
-
READ COMMITTED (读已提交)
- 只能读取已提交的数据
- 避免脏读,但可能出现不可重复读、幻读
- 大多数数据库的默认隔离级别
-
REPEATABLE READ (可重复读)
- 在同一事务中多次读取同一数据结果一致
- 避免脏读、不可重复读,但可能出现幻读
- MySQL 的默认隔离级别
-
SERIALIZABLE (串行化)
- 最高隔离级别,完全避免并发问题
- 性能最差,但数据一致性最好
typescript// 设置不同隔离级别的示例 await dataSource.transaction(async transactionalEntityManager => { const queryRunner = transactionalEntityManager.queryRunner; // READ COMMITTED await queryRunner.query('SET TRANSACTION ISOLATION LEVEL READ COMMITTED'); // REPEATABLE READ await queryRunner.query('SET TRANSACTION ISOLATION LEVEL REPEATABLE READ'); // SERIALIZABLE await queryRunner.query('SET TRANSACTION ISOLATION LEVEL SERIALIZABLE'); });
锁机制
悲观锁
悲观锁假设会发生并发冲突,在读取数据时就锁定。
typescript// SELECT ... FOR UPDATE const user = await dataSource.transaction(async transactionalEntityManager => { const user = await transactionalEntityManager.findOne(User, { where: { id: userId }, lock: { mode: 'pessimistic_write' } }); // 此时该行被锁定,其他事务无法修改 user.balance -= amount; await transactionalEntityManager.save(user); return user; }); // SELECT ... FOR SHARE const user = await dataSource.transaction(async transactionalEntityManager => { const user = await transactionalEntityManager.findOne(User, { where: { id: userId }, lock: { mode: 'pessimistic_read' } }); // 其他事务可以读取但不能修改 return user; });
乐观锁
乐观锁假设不会发生并发冲突,通过版本号或时间戳检测冲突。
typescript@Entity() export class Product { @PrimaryGeneratedColumn() id: number; @Column() name: string; @Column() stock: number; @VersionColumn() version: number; } // 使用乐观锁 async function updateProductStock(productId: number, quantity: number) { const product = await dataSource.manager.findOne(Product, { where: { id: productId } }); if (!product || product.stock < quantity) { throw new Error('Insufficient stock'); } product.stock -= quantity; try { await dataSource.manager.save(product); } catch (error) { // 版本冲突,说明其他事务已经修改了数据 throw new Error('Concurrent modification detected'); } }
嵌套事务
使用 Savepoint
typescriptasync function complexOperation(dataSource: DataSource) { await dataSource.transaction(async transactionalEntityManager => { const queryRunner = transactionalEntityManager.queryRunner; try { // 主事务操作 await transactionalEntityManager.save(/* ... */); // 创建保存点 await queryRunner.query('SAVEPOINT savepoint1'); try { // 子事务操作 await transactionalEntityManager.save(/* ... */); // 如果成功,释放保存点 await queryRunner.query('RELEASE SAVEPOINT savepoint1'); } catch (error) { // 如果失败,回滚到保存点 await queryRunner.query('ROLLBACK TO SAVEPOINT savepoint1'); // 继续执行其他操作 } // 继续主事务操作 await transactionalEntityManager.save(/* ... */); } catch (error) { throw error; // 整个事务回滚 } }); }
事务超时和重试
设置事务超时
typescriptasync function transactionWithTimeout( dataSource: DataSource, timeout: number = 30000 ) { const queryRunner = dataSource.createQueryRunner(); try { await queryRunner.connect(); await queryRunner.startTransaction(); // 设置事务超时(MySQL) await queryRunner.query(`SET max_execution_time = ${timeout}`); // 执行业务逻辑 // ... await queryRunner.commitTransaction(); } catch (error) { await queryRunner.rollbackTransaction(); throw error; } finally { await queryRunner.release(); } }
事务重试机制
typescriptasync function transactionWithRetry( dataSource: DataSource, maxRetries: number = 3, operation: (manager: EntityManager) => Promise<void> ) { let lastError: Error; for (let attempt = 1; attempt <= maxRetries; attempt++) { try { await dataSource.transaction(async transactionalEntityManager => { await operation(transactionalEntityManager); }); return; // 成功,退出重试循环 } catch (error) { lastError = error; // 如果是死锁错误,可以重试 if (error.code === 'ER_LOCK_DEADLOCK' && attempt < maxRetries) { await new Promise(resolve => setTimeout(resolve, 100 * attempt)); continue; } throw error; // 其他错误直接抛出 } } throw lastError; } // 使用示例 await transactionWithRetry(dataSource, 3, async manager => { const user = await manager.findOne(User, { where: { id: 1 } }); user.balance += 100; await manager.save(user); });
分布式事务
两阶段提交 (2PC)
TypeORM 本身不支持分布式事务,但可以通过以下方式实现:
typescriptasync function distributedTransaction( dataSource1: DataSource, dataSource2: DataSource ) { const queryRunner1 = dataSource1.createQueryRunner(); const queryRunner2 = dataSource2.createQueryRunner(); try { await queryRunner1.connect(); await queryRunner2.connect(); await queryRunner1.startTransaction(); await queryRunner2.startTransaction(); // 第一阶段:准备 await queryRunner1.manager.save(/* ... */); await queryRunner2.manager.save(/* ... */); // 第二阶段:提交 await queryRunner1.commitTransaction(); await queryRunner2.commitTransaction(); } catch (error) { // 回滚所有事务 await queryRunner1.rollbackTransaction(); await queryRunner2.rollbackTransaction(); throw error; } finally { await queryRunner1.release(); await queryRunner2.release(); } }
最佳实践
1. 保持事务简短
typescript// ❌ 不好的做法:事务时间过长 await dataSource.transaction(async manager => { const user = await manager.findOne(User, { where: { id: 1 } }); // 执行耗时操作(不应该在事务中) await sendEmail(user.email); await processPayment(user); user.balance += 100; await manager.save(user); }); // ✅ 好的做法:只包含数据库操作 const user = await dataSource.manager.findOne(User, { where: { id: 1 } }); // 在事务外执行耗时操作 await sendEmail(user.email); await processPayment(user); // 在事务中只执行数据库操作 await dataSource.transaction(async manager => { user.balance += 100; await manager.save(user); });
2. 正确处理异常
typescriptawait dataSource.transaction(async manager => { try { const user = await manager.findOne(User, { where: { id: 1 } }); if (!user) { throw new Error('User not found'); } user.balance += 100; await manager.save(user); } catch (error) { // 记录错误日志 console.error('Transaction failed:', error); // 抛出错误以触发回滚 throw error; } });
3. 避免嵌套事务
typescript// ❌ 不好的做法:嵌套事务 await dataSource.transaction(async manager1 => { await dataSource.transaction(async manager2 => { // 嵌套事务可能导致问题 }); }); // ✅ 好的做法:使用单一事务 await dataSource.transaction(async manager => { // 所有操作在一个事务中 });
4. 使用适当的隔离级别
typescript// 根据业务需求选择合适的隔离级别 await dataSource.transaction(async manager => { const queryRunner = manager.queryRunner; // 读多写少的场景使用 READ COMMITTED await queryRunner.query('SET TRANSACTION ISOLATION LEVEL READ COMMITTED'); // 需要强一致性的场景使用 SERIALIZABLE // await queryRunner.query('SET TRANSACTION ISOLATION LEVEL SERIALIZABLE'); // 执行业务逻辑 });
TypeORM 的事务处理功能强大而灵活,掌握事务的使用对于构建可靠的数据驱动应用至关重要。正确使用事务可以确保数据的一致性和完整性,避免并发问题。