在微服务架构中使用 TypeORM 需要考虑数据一致性、服务间通信、分布式事务等复杂问题。
微服务中的数据管理
1. 数据库分离策略
typescript// 用户服务 - user-service // config/database.ts import { DataSource } from 'typeorm'; export const userDataSource = new DataSource({ type: 'postgres', host: process.env.USER_DB_HOST || 'localhost', port: parseInt(process.env.USER_DB_PORT || '5432'), username: process.env.USER_DB_USER || 'user_service', password: process.env.USER_DB_PASSWORD || 'password', database: process.env.USER_DB_NAME || 'user_db', entities: [User, UserProfile, UserSettings], synchronize: false, logging: true, }); // 订单服务 - order-service export const orderDataSource = new DataSource({ type: 'postgres', host: process.env.ORDER_DB_HOST || 'localhost', port: parseInt(process.env.ORDER_DB_PORT || '5432'), username: process.env.ORDER_DB_USER || 'order_service', password: process.env.ORDER_DB_PASSWORD || 'password', database: process.env.ORDER_DB_NAME || 'order_db', entities: [Order, OrderItem, Payment], synchronize: false, logging: true, }); // 产品服务 - product-service export const productDataSource = new DataSource({ type: 'postgres', host: process.env.PRODUCT_DB_HOST || 'localhost', port: parseInt(process.env.PRODUCT_DB_PORT || '5432'), username: process.env.PRODUCT_DB_USER || 'product_service', password: process.env.PRODUCT_DB_PASSWORD || 'password', database: process.env.PRODUCT_DB_NAME || 'product_db', entities: [Product, Category, Inventory], synchronize: false, logging: true, });
2. 服务间数据同步
typescript// 用户服务 - 同步用户数据到其他服务 import { EventEmitter } from 'events'; class UserSyncService extends EventEmitter { async syncUserCreated(user: User) { // 发送用户创建事件 this.emit('user.created', { userId: user.id, email: user.email, name: user.name, timestamp: new Date(), }); } async syncUserUpdated(user: User) { // 发送用户更新事件 this.emit('user.updated', { userId: user.id, email: user.email, name: user.name, timestamp: new Date(), }); } async syncUserDeleted(userId: number) { // 发送用户删除事件 this.emit('user.deleted', { userId, timestamp: new Date(), }); } } // 在用户服务中使用 @EventSubscriber() export class UserSubscriber implements EntitySubscriberInterface<User> { private syncService: UserSyncService; constructor() { this.syncService = new UserSyncService(); } listenTo() { return User; } afterInsert(event: InsertEvent<User>) { this.syncService.syncUserCreated(event.entity); } afterUpdate(event: UpdateEvent<User>) { this.syncService.syncUserUpdated(event.entity); } afterRemove(event: RemoveEvent<User>) { this.syncService.syncUserDeleted(event.entity.id); } }
分布式事务处理
1. Saga 模式实现
typescript// 订单创建 Saga class OrderCreationSaga { private steps: SagaStep[] = []; constructor( private orderDataSource: DataSource, private userDataSource: DataSource, private productDataSource: DataSource ) { this.setupSteps(); } private setupSteps() { // 步骤 1: 验证用户 this.steps.push({ name: 'validateUser', execute: async (data: any) => { const userRepo = this.userDataSource.getRepository(User); const user = await userRepo.findOne({ where: { id: data.userId }, }); if (!user) { throw new Error('User not found'); } return { ...data, user }; }, compensate: async (data: any) => { // 验证步骤无需补偿 }, }); // 步骤 2: 检查库存 this.steps.push({ name: 'checkInventory', execute: async (data: any) => { const inventoryRepo = this.productDataSource.getRepository(Inventory); for (const item of data.items) { const inventory = await inventoryRepo.findOne({ where: { productId: item.productId }, }); if (!inventory || inventory.quantity < item.quantity) { throw new Error(`Insufficient inventory for product ${item.productId}`); } } return data; }, compensate: async (data: any) => { // 检查库存步骤无需补偿 }, }); // 步骤 3: 扣减库存 this.steps.push({ name: 'reserveInventory', execute: async (data: any) => { const inventoryRepo = this.productDataSource.getRepository(Inventory); for (const item of data.items) { await inventoryRepo.decrement( { productId: item.productId }, 'quantity', item.quantity ); } return data; }, compensate: async (data: any) => { // 补偿:恢复库存 const inventoryRepo = this.productDataSource.getRepository(Inventory); for (const item of data.items) { await inventoryRepo.increment( { productId: item.productId }, 'quantity', item.quantity ); } }, }); // 步骤 4: 创建订单 this.steps.push({ name: 'createOrder', execute: async (data: any) => { const orderRepo = this.orderDataSource.getRepository(Order); const order = orderRepo.create({ userId: data.userId, items: data.items, totalAmount: data.totalAmount, status: 'pending', }); const savedOrder = await orderRepo.save(order); return { ...data, orderId: savedOrder.id }; }, compensate: async (data: any) => { // 补偿:删除订单 const orderRepo = this.orderDataSource.getRepository(Order); await orderRepo.delete(data.orderId); }, }); // 步骤 5: 处理支付 this.steps.push({ name: 'processPayment', execute: async (data: any) => { const paymentRepo = this.orderDataSource.getRepository(Payment); const payment = paymentRepo.create({ orderId: data.orderId, amount: data.totalAmount, status: 'processing', }); await paymentRepo.save(payment); // 模拟支付处理 await this.processPaymentAsync(payment); return data; }, compensate: async (data: any) => { // 补偿:取消支付 const paymentRepo = this.orderDataSource.getRepository(Payment); await paymentRepo.update( { orderId: data.orderId }, { status: 'cancelled' } ); }, }); } async execute(data: any): Promise<any> { const executedSteps: SagaStep[] = []; try { for (const step of this.steps) { console.log(`Executing step: ${step.name}`); data = await step.execute(data); executedSteps.push(step); } // 所有步骤成功,更新订单状态 await this.orderDataSource.getRepository(Order).update( { id: data.orderId }, { status: 'completed' } ); return data; } catch (error) { console.error('Saga failed, compensating...', error); // 执行补偿操作 for (let i = executedSteps.length - 1; i >= 0; i--) { const step = executedSteps[i]; console.log(`Compensating step: ${step.name}`); try { await step.compensate(data); } catch (compensationError) { console.error(`Compensation failed for step ${step.name}:`, compensationError); } } throw error; } } private async processPaymentAsync(payment: Payment): Promise<void> { // 模拟支付处理 return new Promise((resolve) => { setTimeout(() => { resolve(); }, 1000); }); } } interface SagaStep { name: string; execute: (data: any) => Promise<any>; compensate: (data: any) => Promise<void>; }
2. 两阶段提交 (2PC)
typescript// 两阶段提交协调器 class TwoPhaseCommitCoordinator { private participants: TwoPhaseCommitParticipant[] = []; registerParticipant(participant: TwoPhaseCommitParticipant) { this.participants.push(participant); } async execute(): Promise<void> { // 阶段 1: 准备 console.log('Phase 1: Prepare'); for (const participant of this.participants) { await participant.prepare(); } // 阶段 2: 提交 console.log('Phase 2: Commit'); for (const participant of this.participants) { await participant.commit(); } } async rollback(): Promise<void> { console.log('Rolling back'); for (const participant of this.participants) { await participant.rollback(); } } } interface TwoPhaseCommitParticipant { prepare(): Promise<void>; commit(): Promise<void>; rollback(): Promise<void>; } // 用户服务参与者 class UserServiceParticipant implements TwoPhaseCommitParticipant { constructor(private dataSource: DataSource) {} async prepare(): Promise<void> { console.log('UserService: Preparing...'); // 准备用户数据 } async commit(): Promise<void> { console.log('UserService: Committing...'); // 提交用户数据 } async rollback(): Promise<void> { console.log('UserService: Rolling back...'); // 回滚用户数据 } } // 订单服务参与者 class OrderServiceParticipant implements TwoPhaseCommitParticipant { constructor(private dataSource: DataSource) {} async prepare(): Promise<void> { console.log('OrderService: Preparing...'); // 准备订单数据 } async commit(): Promise<void> { console.log('OrderService: Committing...'); // 提交订单数据 } async rollback(): Promise<void> { console.log('OrderService: Rolling back...'); // 回滚订单数据 } }
事件驱动架构
1. 事件总线实现
typescript// 事件总线 class EventBus { private eventQueue: any[] = []; private subscribers: Map<string, Function[]> = new Map(); publish(event: any) { console.log(`Publishing event: ${event.type}`); this.eventQueue.push(event); this.processEvent(event); } subscribe(eventType: string, handler: Function) { if (!this.subscribers.has(eventType)) { this.subscribers.set(eventType, []); } this.subscribers.get(eventType)!.push(handler); } private processEvent(event: any) { const handlers = this.subscribers.get(event.type); if (handlers) { handlers.forEach(handler => { try { handler(event); } catch (error) { console.error(`Error processing event ${event.type}:`, error); } }); } } } // 全局事件总线实例 export const eventBus = new EventBus(); // 在用户服务中发布事件 @EventSubscriber() export class UserEventPublisher implements EntitySubscriberInterface<User> { listenTo() { return User; } afterInsert(event: InsertEvent<User>) { eventBus.publish({ type: 'user.created', payload: { userId: event.entity.id, email: event.entity.email, name: event.entity.name, timestamp: new Date(), }, }); } afterUpdate(event: UpdateEvent<User>) { eventBus.publish({ type: 'user.updated', payload: { userId: event.entity.id, email: event.entity.email, name: event.entity.name, timestamp: new Date(), }, }); } } // 在订单服务中订阅用户事件 eventBus.subscribe('user.created', async (event: any) => { console.log('OrderService received user.created event:', event); // 处理用户创建事件 await handleUserCreated(event.payload); }); eventBus.subscribe('user.updated', async (event: any) => { console.log('OrderService received user.updated event:', event); // 处理用户更新事件 await handleUserUpdated(event.payload); });
2. 消息队列集成
typescript// 使用 RabbitMQ 作为消息队列 import amqp from 'amqplib'; class MessageQueueService { private connection: any; private channel: any; async connect() { this.connection = await amqp.connect('amqp://localhost'); this.channel = await this.connection.createChannel(); } async publish(queue: string, message: any) { await this.channel.assertQueue(queue, { durable: true }); this.channel.sendToQueue( queue, Buffer.from(JSON.stringify(message)), { persistent: true } ); } async subscribe(queue: string, handler: Function) { await this.channel.assertQueue(queue, { durable: true }); this.channel.consume(queue, async (msg: any) => { try { const message = JSON.parse(msg.content.toString()); await handler(message); this.channel.ack(msg); } catch (error) { console.error('Error processing message:', error); this.channel.nack(msg, false, true); // 重新入队 } }); } async close() { await this.channel.close(); await this.connection.close(); } } // 在用户服务中发布消息 const messageQueue = new MessageQueueService(); await messageQueue.connect(); @EventSubscriber() export class UserMessagePublisher implements EntitySubscriberInterface<User> { listenTo() { return User; } async afterInsert(event: InsertEvent<User>) { await messageQueue.publish('user.created', { userId: event.entity.id, email: event.entity.email, name: event.entity.name, timestamp: new Date(), }); } } // 在订单服务中订阅消息 await messageQueue.subscribe('user.created', async (message: any) => { console.log('OrderService received user.created message:', message); await handleUserCreated(message); });
数据一致性策略
1. 最终一致性
typescript// 用户数据同步服务 class UserDataSyncService { async syncUserToOrderService(user: User) { try { // 调用订单服务 API await fetch('http://order-service/api/users/sync', { method: 'POST', headers: { 'Content-Type': 'application/json', }, body: JSON.stringify({ userId: user.id, email: user.email, name: user.name, }), }); } catch (error) { // 记录失败,稍后重试 console.error('Failed to sync user to order service:', error); await this.scheduleRetry(user); } } async scheduleRetry(user: User) { // 使用延迟队列重试 setTimeout(() => { this.syncUserToOrderService(user); }, 5000); // 5 秒后重试 } } // 在用户服务中使用 @EventSubscriber() export class UserSyncSubscriber implements EntitySubscriberInterface<User> { private syncService: UserDataSyncService; constructor() { this.syncService = new UserDataSyncService(); } listenTo() { return User; } afterInsert(event: InsertEvent<User>) { this.syncService.syncUserToOrderService(event.entity); } afterUpdate(event: UpdateEvent<User>) { this.syncService.syncUserToOrderService(event.entity); } }
2. 幂等性处理
typescript// 幂等性处理器 class IdempotencyHandler { private processedIds: Set<string> = new Set(); async processWithIdempotency( id: string, operation: () => Promise<void> ): Promise<void> { if (this.processedIds.has(id)) { console.log(`Operation ${id} already processed, skipping`); return; } await operation(); this.processedIds.add(id); } // 使用数据库存储已处理的 ID async processWithDatabaseIdempotency( id: string, operation: () => Promise<void> ): Promise<void> { const idempotencyRepo = this.dataSource.getRepository(IdempotencyKey); const existing = await idempotencyRepo.findOne({ where: { key: id }, }); if (existing) { console.log(`Operation ${id} already processed, skipping`); return; } await operation(); await idempotencyRepo.save({ key: id, processedAt: new Date(), }); } } // 在消息处理中使用 const idempotencyHandler = new IdempotencyHandler(); await messageQueue.subscribe('user.created', async (message: any) => { await idempotencyHandler.processWithDatabaseIdempotency( `user.created.${message.userId}`, async () => { await handleUserCreated(message); } ); });
服务发现与负载均衡
1. 服务注册
typescript// 服务注册中心 class ServiceRegistry { private services: Map<string, ServiceInstance[]> = new Map(); register(serviceName: string, instance: ServiceInstance) { if (!this.services.has(serviceName)) { this.services.set(serviceName, []); } this.services.get(serviceName)!.push(instance); console.log(`Registered ${serviceName} instance:`, instance); } unregister(serviceName: string, instance: ServiceInstance) { const instances = this.services.get(serviceName); if (instances) { const index = instances.indexOf(instance); if (index > -1) { instances.splice(index, 1); console.log(`Unregistered ${serviceName} instance:`, instance); } } } discover(serviceName: string): ServiceInstance | null { const instances = this.services.get(serviceName); if (!instances || instances.length === 0) { return null; } // 轮询负载均衡 const index = Math.floor(Math.random() * instances.length); return instances[index]; } } interface ServiceInstance { host: string; port: number; healthCheckUrl: string; } // 在服务启动时注册 const serviceRegistry = new ServiceRegistry(); serviceRegistry.register('user-service', { host: 'localhost', port: 3001, healthCheckUrl: 'http://localhost:3001/health', }); serviceRegistry.register('order-service', { host: 'localhost', port: 3002, healthCheckUrl: 'http://localhost:3002/health', });
2. 健康检查
typescript// 健康检查服务 class HealthCheckService { async checkService(instance: ServiceInstance): Promise<boolean> { try { const response = await fetch(instance.healthCheckUrl); return response.ok; } catch (error) { console.error(`Health check failed for ${instance.host}:${instance.port}:`, error); return false; } } async startHealthChecks(registry: ServiceRegistry) { setInterval(async () => { const services = ['user-service', 'order-service', 'product-service']; for (const serviceName of services) { const instance = registry.discover(serviceName); if (instance) { const isHealthy = await this.checkService(instance); if (!isHealthy) { console.log(`Service ${serviceName} is unhealthy, removing from registry`); registry.unregister(serviceName, instance); } } } }, 30000); // 每 30 秒检查一次 } } // 启动健康检查 const healthCheckService = new HealthCheckService(); healthCheckService.startHealthChecks(serviceRegistry);
TypeORM 在微服务架构中的应用需要仔细设计数据管理、事务处理和通信机制,以确保系统的可靠性和可扩展性。