乐闻世界logo
搜索文章和话题

TypeORM 的订阅者 (Subscriber) 是什么?如何使用订阅者监听实体事件

2月17日 23:52

订阅者是 TypeORM 中用于监听和响应实体事件的强大机制。它允许开发者在实体生命周期的关键时刻执行自定义逻辑,类似于数据库触发器,但更加灵活和类型安全。

订阅者基础概念

什么是订阅者

订阅者是一个类,它监听特定实体的生命周期事件,并在这些事件发生时执行相应的逻辑。订阅者可以监听的事件包括:

  • beforeInsert: 插入前
  • afterInsert: 插入后
  • beforeUpdate: 更新前
  • afterUpdate: 更新后
  • beforeRemove: 删除前
  • afterRemove: 删除后
  • beforeSoftRemove: 软删除前
  • afterSoftRemove: 软删除后
  • beforeRecover: 恢复前
  • afterRecover: 恢复后

订阅者 vs 监听器

  • 订阅者: 监听所有实体实例的事件,适合全局逻辑
  • 监听器: 在实体内部定义,只监听该实体的事件,适合实体特定的逻辑

创建订阅者

基本订阅者示例

typescript
import { EntitySubscriberInterface, EventSubscriber, InsertEvent, UpdateEvent } from 'typeorm'; import { User } from '../entity/User'; @EventSubscriber() export class UserSubscriber implements EntitySubscriberInterface<User> { // 指定要监听的实体 listenTo() { return User; } // 插入前 beforeInsert(event: InsertEvent<User>) { console.log('Before insert user:', event.entity); // 自动生成用户名 if (!event.entity.username) { event.entity.username = event.entity.email.split('@')[0]; } // 自动设置创建时间 if (!event.entity.createdAt) { event.entity.createdAt = new Date(); } } // 插入后 afterInsert(event: InsertEvent<User>) { console.log('After insert user:', event.entity); // 发送欢迎邮件 this.sendWelcomeEmail(event.entity); // 记录审计日志 this.logAudit('INSERT', event.entity); } // 更新前 beforeUpdate(event: UpdateEvent<User>) { console.log('Before update user:', event.entity); // 自动更新修改时间 if (event.entity) { event.entity.updatedAt = new Date(); } } // 更新后 afterUpdate(event: UpdateEvent<User>) { console.log('After update user:', event.entity); // 记录变更历史 this.recordChangeHistory(event); } // 删除前 beforeRemove(event: any) { console.log('Before remove user:', event.entity); // 检查是否可以删除 if (event.entity.hasActiveOrders()) { throw new Error('Cannot delete user with active orders'); } } // 删除后 afterRemove(event: any) { console.log('After remove user:', event.entity); // 清理关联数据 this.cleanupRelatedData(event.entity.id); } private sendWelcomeEmail(user: User) { // 发送欢迎邮件的逻辑 console.log(`Sending welcome email to ${user.email}`); } private logAudit(action: string, user: User) { // 记录审计日志的逻辑 console.log(`Audit log: ${action} user ${user.id}`); } private recordChangeHistory(event: UpdateEvent<User>) { // 记录变更历史的逻辑 console.log('Recording change history:', event.databaseEntity, event.entity); } private cleanupRelatedData(userId: number) { // 清理关联数据的逻辑 console.log(`Cleaning up data for user ${userId}`); } }

注册订阅者

在 DataSource 中注册

typescript
import { DataSource } from 'typeorm'; import { UserSubscriber } from './subscriber/UserSubscriber'; export const AppDataSource = new DataSource({ type: 'mysql', host: 'localhost', port: 3306, username: 'root', password: 'password', database: 'myapp', entities: [User, Post, Comment], subscribers: [UserSubscriber], // 注册订阅者 synchronize: false, logging: true, });

动态注册订阅者

typescript
import { DataSource } from 'typeorm'; const dataSource = new DataSource({ type: 'mysql', host: 'localhost', port: 3306, username: 'root', password: 'password', database: 'myapp', entities: [User, Post, Comment], synchronize: false, logging: true, }); // 初始化后动态添加订阅者 await dataSource.initialize(); const userSubscriber = new UserSubscriber(); dataSource.subscribers.push(userSubscriber);

高级订阅者用法

数据验证

typescript
@EventSubscriber() export class UserSubscriber implements EntitySubscriberInterface<User> { listenTo() { return User; } beforeInsert(event: InsertEvent<User>) { this.validateUser(event.entity); } beforeUpdate(event: UpdateEvent<User>) { if (event.entity) { this.validateUser(event.entity); } } private validateUser(user: User) { // 验证邮箱格式 const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/; if (!emailRegex.test(user.email)) { throw new Error('Invalid email format'); } // 验证年龄 if (user.age && (user.age < 18 || user.age > 120)) { throw new Error('Age must be between 18 and 120'); } // 验证用户名长度 if (user.username && user.username.length < 3) { throw new Error('Username must be at least 3 characters'); } } }

自动填充字段

typescript
@EventSubscriber() export class BaseEntitySubscriber implements EntitySubscriberInterface { listenTo() { return Object; // 监听所有实体 } beforeInsert(event: InsertEvent<any>) { const entity = event.entity; const now = new Date(); // 自动设置创建时间 if (entity.hasOwnProperty('createdAt') && !entity.createdAt) { entity.createdAt = now; } // 自动设置更新时间 if (entity.hasOwnProperty('updatedAt') && !entity.updatedAt) { entity.updatedAt = now; } // 自动设置创建者 if (entity.hasOwnProperty('createdBy') && !entity.createdBy) { entity.createdBy = this.getCurrentUserId(); } } beforeUpdate(event: UpdateEvent<any>) { const entity = event.entity; if (entity) { // 自动更新更新时间 if (entity.hasOwnProperty('updatedAt')) { entity.updatedAt = new Date(); } // 自动设置更新者 if (entity.hasOwnProperty('updatedBy')) { entity.updatedBy = this.getCurrentUserId(); } } } private getCurrentUserId(): number { // 获取当前用户 ID 的逻辑 return 1; // 示例 } }

审计日志

typescript
@EventSubscriber() export class AuditLogSubscriber implements EntitySubscriberInterface { listenTo() { return Object; // 监听所有实体 } async afterInsert(event: InsertEvent<any>) { await this.createAuditLog('INSERT', event.entity); } async afterUpdate(event: UpdateEvent<any>) { await this.createAuditLog('UPDATE', event.entity, event.databaseEntity); } async afterRemove(event: any) { await this.createAuditLog('DELETE', event.entity); } private async createAuditLog( action: string, entity: any, oldEntity?: any ) { const auditLog = { action, entityType: entity.constructor.name, entityId: entity.id, userId: this.getCurrentUserId(), timestamp: new Date(), changes: oldEntity ? this.getChanges(oldEntity, entity) : null, ipAddress: this.getCurrentIpAddress(), }; // 保存审计日志 console.log('Creating audit log:', auditLog); // await this.auditLogRepository.save(auditLog); } private getChanges(oldEntity: any, newEntity: any): any { const changes: any = {}; for (const key in newEntity) { if (oldEntity[key] !== newEntity[key]) { changes[key] = { old: oldEntity[key], new: newEntity[key], }; } } return changes; } private getCurrentUserId(): number { return 1; // 示例 } private getCurrentIpAddress(): string { return '127.0.0.1'; // 示例 } }

缓存失效

typescript
@EventSubscriber() export class CacheInvalidationSubscriber implements EntitySubscriberInterface { private cacheService: CacheService; constructor() { this.cacheService = new CacheService(); } listenTo() { return Object; // 监听所有实体 } async afterInsert(event: InsertEvent<any>) { await this.invalidateCache(event.entity); } async afterUpdate(event: UpdateEvent<any>) { if (event.entity) { await this.invalidateCache(event.entity); } } async afterRemove(event: any) { await this.invalidateCache(event.entity); } private async invalidateCache(entity: any) { const entityType = entity.constructor.name.toLowerCase(); const entityId = entity.id; // 使单个实体的缓存失效 await this.cacheService.delete(`${entityType}:${entityId}`); // 使列表缓存失效 await this.cacheService.delete(`${entityType}:list:*`); console.log(`Cache invalidated for ${entityType}:${entityId}`); } }

通知和事件

typescript
@EventSubscriber() export class NotificationSubscriber implements EntitySubscriberInterface { private notificationService: NotificationService; constructor() { this.notificationService = new NotificationService(); } listenTo() { return Object; // 监听所有实体 } async afterInsert(event: InsertEvent<any>) { await this.handleInsertEvent(event); } async afterUpdate(event: UpdateEvent<any>) { await this.handleUpdateEvent(event); } private async handleInsertEvent(event: InsertEvent<any>) { const entity = event.entity; // 根据实体类型发送不同的通知 switch (entity.constructor.name) { case 'Order': await this.notificationService.sendOrderCreatedNotification(entity); break; case 'Comment': await this.notificationService.sendCommentNotification(entity); break; case 'Message': await this.notificationService.sendMessageNotification(entity); break; } } private async handleUpdateEvent(event: UpdateEvent<any>) { const entity = event.entity; if (!entity) return; // 根据实体类型和变更发送通知 switch (entity.constructor.name) { case 'Order': if (entity.status !== event.databaseEntity.status) { await this.notificationService.sendOrderStatusChangedNotification(entity); } break; case 'User': if (entity.email !== event.databaseEntity.email) { await this.notificationService.sendEmailChangedNotification(entity); } break; } } }

订阅者最佳实践

1. 单一职责原则

每个订阅者应该只负责一个特定的功能领域。

typescript
// ✅ 好的做法:每个订阅者负责一个功能 @EventSubscriber() export class UserValidationSubscriber implements EntitySubscriberInterface<User> { listenTo() { return User; } beforeInsert(event: InsertEvent<User>) { /* 验证逻辑 */ } } @EventSubscriber() export class UserAuditSubscriber implements EntitySubscriberInterface<User> { listenTo() { return User; } afterInsert(event: InsertEvent<User>) { /* 审计逻辑 */ } } // ❌ 不好的做法:一个订阅者负责多个功能 @EventSubscriber() export class UserSubscriber implements EntitySubscriberInterface<User> { listenTo() { return User; } beforeInsert(event: InsertEvent<User>) { /* 验证逻辑 */ } afterInsert(event: InsertEvent<User>) { /* 审计逻辑 */ } afterUpdate(event: UpdateEvent<User>) { /* 通知逻辑 */ } }

2. 避免循环依赖

订阅者不应该触发会导致其他订阅者无限循环的操作。

typescript
@EventSubscriber() export class UserSubscriber implements EntitySubscriberInterface<User> { constructor( private userRepository: Repository<User> ) {} listenTo() { return User; } async afterInsert(event: InsertEvent<User>) { // ❌ 不好的做法:可能导致循环 // await this.userRepository.save(event.entity); // ✅ 好的做法:使用 EntityManager 避免触发订阅者 await this.userRepository.manager.save(User, event.entity); } }

3. 错误处理

在订阅者中妥善处理错误,避免影响主操作。

typescript
@EventSubscriber() export class SafeSubscriber implements EntitySubscriberInterface<User> { listenTo() { return User; } async afterInsert(event: InsertEvent<User>) { try { await this.sendNotification(event.entity); } catch (error) { // 记录错误但不影响主操作 console.error('Failed to send notification:', error); // 可以将错误发送到错误监控系统 } } private async sendNotification(user: User) { // 发送通知的逻辑 } }

4. 性能考虑

避免在订阅者中执行耗时操作。

typescript
@EventSubscriber() export class PerformanceAwareSubscriber implements EntitySubscriberInterface<User> { listenTo() { return User; } async afterInsert(event: InsertEvent<User>) { // ❌ 不好的做法:同步执行耗时操作 // await this.sendEmail(event.entity); // await this.generateReport(event.entity); // ✅ 好的做法:异步执行耗时操作 setImmediate(() => { this.sendEmail(event.entity).catch(console.error); this.generateReport(event.entity).catch(console.error); }); // 或者使用消息队列 // await this.queueService.add('send-email', { userId: event.entity.id }); } private async sendEmail(user: User) { // 发送邮件的逻辑 } private async generateReport(user: User) { // 生成报告的逻辑 } }

5. 测试订阅者

为订阅者编写单元测试。

typescript
import { InsertEvent } from 'typeorm'; import { UserSubscriber } from './UserSubscriber'; import { User } from '../entity/User'; describe('UserSubscriber', () => { let subscriber: UserSubscriber; beforeEach(() => { subscriber = new UserSubscriber(); }); it('should auto-generate username before insert', () => { const user = new User(); user.email = 'test@example.com'; const event: InsertEvent<User> = { entity: user, metadata: {} as any, queryRunner: {} as any, manager: {} as any, }; subscriber.beforeInsert(event); expect(user.username).toBe('test'); }); it('should set createdAt before insert', () => { const user = new User(); user.email = 'test@example.com'; const event: InsertEvent<User> = { entity: user, metadata: {} as any, queryRunner: {} as any, manager: {} as any, }; subscriber.beforeInsert(event); expect(user.createdAt).toBeInstanceOf(Date); }); });

订阅者 vs 监听器

监听器示例

typescript
@Entity() export class User { @PrimaryGeneratedColumn() id: number; @Column() name: string; @Column() email: string; @CreateDateColumn() createdAt: Date; @UpdateDateColumn() updatedAt: Date; @BeforeInsert() beforeInsert() { console.log('Before insert in entity'); this.createdAt = new Date(); } @AfterInsert() afterInsert() { console.log('After insert in entity'); } @BeforeUpdate() beforeUpdate() { console.log('Before update in entity'); this.updatedAt = new Date(); } @AfterUpdate() afterUpdate() { console.log('After update in entity'); } }

选择订阅者还是监听器

使用订阅者的情况:

  • 需要监听多个实体的事件
  • 需要注入其他服务
  • 需要实现跨实体的业务逻辑
  • 需要保持实体类的简洁

使用监听器的情况:

  • 逻辑只与单个实体相关
  • 逻辑简单,不需要外部依赖
  • 希望逻辑与实体紧密关联

TypeORM 的订阅者机制提供了强大而灵活的事件处理能力,合理使用订阅者可以实现复杂的业务逻辑,同时保持代码的整洁和可维护性。

标签:TypeORM