2月18日 22:18
How to use TypeORM in microservices architecture? Including data consistency, distributed transactions, and inter-service communication
Using TypeORM in microservices architecture requires consideration of complex issues such as data consistency, inter-service communication, and distributed transactions.
Data Management in Microservices
1. Database Separation Strategy
typescript// User service - user-service // config/database.ts import { DataSource } from 'typeorm'; export const userDataSource = new DataSource({ type: 'postgres', host: process.env.USER_DB_HOST || 'localhost', port: parseInt(process.env.USER_DB_PORT || '5432'), username: process.env.USER_DB_USER || 'user_service', password: process.env.USER_DB_PASSWORD || 'password', database: process.env.USER_DB_NAME || 'user_db', entities: [User, UserProfile, UserSettings], synchronize: false, logging: true, }); // Order service - order-service export const orderDataSource = new DataSource({ type: 'postgres', host: process.env.ORDER_DB_HOST || 'localhost', port: parseInt(process.env.ORDER_DB_PORT || '5432'), username: process.env.ORDER_DB_USER || 'order_service', password: process.env.ORDER_DB_PASSWORD || 'password', database: process.env.ORDER_DB_NAME || 'order_db', entities: [Order, OrderItem, Payment], synchronize: false, logging: true, }); // Product service - product-service export const productDataSource = new DataSource({ type: 'postgres', host: process.env.PRODUCT_DB_HOST || 'localhost', port: parseInt(process.env.PRODUCT_DB_PORT || '5432'), username: process.env.PRODUCT_DB_USER || 'product_service', password: process.env.PRODUCT_DB_PASSWORD || 'password', database: process.env.PRODUCT_DB_NAME || 'product_db', entities: [Product, Category, Inventory], synchronize: false, logging: true, });
2. Inter-Service Data Synchronization
typescript// User service - sync user data to other services import { EventEmitter } from 'events'; class UserSyncService extends EventEmitter { async syncUserCreated(user: User) { // Send user creation event this.emit('user.created', { userId: user.id, email: user.email, name: user.name, timestamp: new Date(), }); } async syncUserUpdated(user: User) { // Send user update event this.emit('user.updated', { userId: user.id, email: user.email, name: user.name, timestamp: new Date(), }); } async syncUserDeleted(userId: number) { // Send user deletion event this.emit('user.deleted', { userId, timestamp: new Date(), }); } } // Use in user service @EventSubscriber() export class UserSubscriber implements EntitySubscriberInterface<User> { private syncService: UserSyncService; constructor() { this.syncService = new UserSyncService(); } listenTo() { return User; } afterInsert(event: InsertEvent<User>) { this.syncService.syncUserCreated(event.entity); } afterUpdate(event: UpdateEvent<User>) { this.syncService.syncUserUpdated(event.entity); } afterRemove(event: RemoveEvent<User>) { this.syncService.syncUserDeleted(event.entity.id); } }
Distributed Transaction Handling
1. Saga Pattern Implementation
typescript// Order creation Saga class OrderCreationSaga { private steps: SagaStep[] = []; constructor( private orderDataSource: DataSource, private userDataSource: DataSource, private productDataSource: DataSource ) { this.setupSteps(); } private setupSteps() { // Step 1: Validate user this.steps.push({ name: 'validateUser', execute: async (data: any) => { const userRepo = this.userDataSource.getRepository(User); const user = await userRepo.findOne({ where: { id: data.userId }, }); if (!user) { throw new Error('User not found'); } return { ...data, user }; }, compensate: async (data: any) => { // Validation step doesn't need compensation }, }); // Step 2: Check inventory this.steps.push({ name: 'checkInventory', execute: async (data: any) => { const inventoryRepo = this.productDataSource.getRepository(Inventory); for (const item of data.items) { const inventory = await inventoryRepo.findOne({ where: { productId: item.productId }, }); if (!inventory || inventory.quantity < item.quantity) { throw new Error(`Insufficient inventory for product ${item.productId}`); } } return data; }, compensate: async (data: any) => { // Inventory check doesn't need compensation }, }); // Step 3: Reserve inventory this.steps.push({ name: 'reserveInventory', execute: async (data: any) => { const inventoryRepo = this.productDataSource.getRepository(Inventory); for (const item of data.items) { await inventoryRepo.decrement( { productId: item.productId }, 'quantity', item.quantity ); } return data; }, compensate: async (data: any) => { // Compensate: restore inventory const inventoryRepo = this.productDataSource.getRepository(Inventory); for (const item of data.items) { await inventoryRepo.increment( { productId: item.productId }, 'quantity', item.quantity ); } }, }); // Step 4: Create order this.steps.push({ name: 'createOrder', execute: async (data: any) => { const orderRepo = this.orderDataSource.getRepository(Order); const order = orderRepo.create({ userId: data.userId, items: data.items, totalAmount: data.totalAmount, status: 'pending', }); const savedOrder = await orderRepo.save(order); return { ...data, orderId: savedOrder.id }; }, compensate: async (data: any) => { // Compensate: delete order const orderRepo = this.orderDataSource.getRepository(Order); await orderRepo.delete(data.orderId); }, }); // Step 5: Process payment this.steps.push({ name: 'processPayment', execute: async (data: any) => { const paymentRepo = this.orderDataSource.getRepository(Payment); const payment = paymentRepo.create({ orderId: data.orderId, amount: data.totalAmount, status: 'processing', }); await paymentRepo.save(payment); // Simulate payment processing await this.processPaymentAsync(payment); return data; }, compensate: async (data: any) => { // Compensate: cancel payment const paymentRepo = this.orderDataSource.getRepository(Payment); await paymentRepo.update( { orderId: data.orderId }, { status: 'cancelled' } ); }, }); } async execute(data: any): Promise<any> { const executedSteps: SagaStep[] = []; try { for (const step of this.steps) { console.log(`Executing step: ${step.name}`); data = await step.execute(data); executedSteps.push(step); } // All steps successful, update order status await this.orderDataSource.getRepository(Order).update( { id: data.orderId }, { status: 'completed' } ); return data; } catch (error) { console.error('Saga failed, compensating...', error); // Execute compensation operations for (let i = executedSteps.length - 1; i >= 0; i--) { const step = executedSteps[i]; console.log(`Compensating step: ${step.name}`); try { await step.compensate(data); } catch (compensationError) { console.error(`Compensation failed for step ${step.name}:`, compensationError); } } throw error; } } private async processPaymentAsync(payment: Payment): Promise<void> { // Simulate payment processing return new Promise((resolve) => { setTimeout(() => { resolve(); }, 1000); }); } } interface SagaStep { name: string; execute: (data: any) => Promise<any>; compensate: (data: any) => Promise<void>; }
2. Two-Phase Commit (2PC)
typescript// Two-phase commit coordinator class TwoPhaseCommitCoordinator { private participants: TwoPhaseCommitParticipant[] = []; registerParticipant(participant: TwoPhaseCommitParticipant) { this.participants.push(participant); } async execute(): Promise<void> { // Phase 1: Prepare console.log('Phase 1: Prepare'); for (const participant of this.participants) { await participant.prepare(); } // Phase 2: Commit console.log('Phase 2: Commit'); for (const participant of this.participants) { await participant.commit(); } } async rollback(): Promise<void> { console.log('Rolling back'); for (const participant of this.participants) { await participant.rollback(); } } } interface TwoPhaseCommitParticipant { prepare(): Promise<void>; commit(): Promise<void>; rollback(): Promise<void>; } // User service participant class UserServiceParticipant implements TwoPhaseCommitParticipant { constructor(private dataSource: DataSource) {} async prepare(): Promise<void> { console.log('UserService: Preparing...'); // Prepare user data } async commit(): Promise<void> { console.log('UserService: Committing...'); // Commit user data } async rollback(): Promise<void> { console.log('UserService: Rolling back...'); // Rollback user data } } // Order service participant class OrderServiceParticipant implements TwoPhaseCommitParticipant { constructor(private dataSource: DataSource) {} async prepare(): Promise<void> { console.log('OrderService: Preparing...'); // Prepare order data } async commit(): Promise<void> { console.log('OrderService: Committing...'); // Commit order data } async rollback(): Promise<void> { console.log('OrderService: Rolling back...'); // Rollback order data } }
Event-Driven Architecture
1. Event Bus Implementation
typescript// Event bus class EventBus { private eventQueue: any[] = []; private subscribers: Map<string, Function[]> = new Map(); publish(event: any) { console.log(`Publishing event: ${event.type}`); this.eventQueue.push(event); this.processEvent(event); } subscribe(eventType: string, handler: Function) { if (!this.subscribers.has(eventType)) { this.subscribers.set(eventType, []); } this.subscribers.get(eventType)!.push(handler); } private processEvent(event: any) { const handlers = this.subscribers.get(event.type); if (handlers) { handlers.forEach(handler => { try { handler(event); } catch (error) { console.error(`Error processing event ${event.type}:`, error); } }); } } } // Global event bus instance export const eventBus = new EventBus(); // Publish events in user service @EventSubscriber() export class UserEventPublisher implements EntitySubscriberInterface<User> { listenTo() { return User; } afterInsert(event: InsertEvent<User>) { eventBus.publish({ type: 'user.created', payload: { userId: event.entity.id, email: event.entity.email, name: event.entity.name, timestamp: new Date(), }, }); } afterUpdate(event: UpdateEvent<User>) { eventBus.publish({ type: 'user.updated', payload: { userId: event.entity.id, email: event.entity.email, name: event.entity.name, timestamp: new Date(), }, }); } } // Subscribe to user events in order service eventBus.subscribe('user.created', async (event: any) => { console.log('OrderService received user.created event:', event); // Handle user creation event await handleUserCreated(event.payload); }); eventBus.subscribe('user.updated', async (event: any) => { console.log('OrderService received user.updated event:', event); // Handle user update event await handleUserUpdated(event.payload); });
2. Message Queue Integration
typescript// Use RabbitMQ as message queue import amqp from 'amqplib'; class MessageQueueService { private connection: any; private channel: any; async connect() { this.connection = await amqp.connect('amqp://localhost'); this.channel = await this.connection.createChannel(); } async publish(queue: string, message: any) { await this.channel.assertQueue(queue, { durable: true }); this.channel.sendToQueue( queue, Buffer.from(JSON.stringify(message)), { persistent: true } ); } async subscribe(queue: string, handler: Function) { await this.channel.assertQueue(queue, { durable: true }); this.channel.consume(queue, async (msg: any) => { try { const message = JSON.parse(msg.content.toString()); await handler(message); this.channel.ack(msg); } catch (error) { console.error('Error processing message:', error); this.channel.nack(msg, false, true); // Requeue } }); } async close() { await this.channel.close(); await this.connection.close(); } } // Publish messages in user service const messageQueue = new MessageQueueService(); await messageQueue.connect(); @EventSubscriber() export class UserMessagePublisher implements EntitySubscriberInterface<User> { listenTo() { return User; } async afterInsert(event: InsertEvent<User>) { await messageQueue.publish('user.created', { userId: event.entity.id, email: event.entity.email, name: event.entity.name, timestamp: new Date(), }); } } // Subscribe to messages in order service await messageQueue.subscribe('user.created', async (message: any) => { console.log('OrderService received user.created message:', message); await handleUserCreated(message); });
Data Consistency Strategies
1. Eventual Consistency
typescript// User data synchronization service class UserDataSyncService { async syncUserToOrderService(user: User) { try { // Call order service API await fetch('http://order-service/api/users/sync', { method: 'POST', headers: { 'Content-Type': 'application/json', }, body: JSON.stringify({ userId: user.id, email: user.email, name: user.name, }), }); } catch (error) { // Log failure, retry later console.error('Failed to sync user to order service:', error); await this.scheduleRetry(user); } } async scheduleRetry(user: User) { // Use delayed queue for retry setTimeout(() => { this.syncUserToOrderService(user); }, 5000); // Retry after 5 seconds } } // Use in user service @EventSubscriber() export class UserSyncSubscriber implements EntitySubscriberInterface<User> { private syncService: UserDataSyncService; constructor() { this.syncService = new UserDataSyncService(); } listenTo() { return User; } afterInsert(event: InsertEvent<User>) { this.syncService.syncUserToOrderService(event.entity); } afterUpdate(event: UpdateEvent<User>) { this.syncService.syncUserToOrderService(event.entity); } }
2. Idempotency Handling
typescript// Idempotency handler class IdempotencyHandler { private processedIds: Set<string> = new Set(); async processWithIdempotency( id: string, operation: () => Promise<void> ): Promise<void> { if (this.processedIds.has(id)) { console.log(`Operation ${id} already processed, skipping`); return; } await operation(); this.processedIds.add(id); } // Use database to store processed IDs async processWithDatabaseIdempotency( id: string, operation: () => Promise<void> ): Promise<void> { const idempotencyRepo = this.dataSource.getRepository(IdempotencyKey); const existing = await idempotencyRepo.findOne({ where: { key: id }, }); if (existing) { console.log(`Operation ${id} already processed, skipping`); return; } await operation(); await idempotencyRepo.save({ key: id, processedAt: new Date(), }); } } // Use in message processing const idempotencyHandler = new IdempotencyHandler(); await messageQueue.subscribe('user.created', async (message: any) => { await idempotencyHandler.processWithDatabaseIdempotency( `user.created.${message.userId}`, async () => { await handleUserCreated(message); } ); });
Service Discovery and Load Balancing
1. Service Registration
typescript// Service registry class ServiceRegistry { private services: Map<string, ServiceInstance[]> = new Map(); register(serviceName: string, instance: ServiceInstance) { if (!this.services.has(serviceName)) { this.services.set(serviceName, []); } this.services.get(serviceName)!.push(instance); console.log(`Registered ${serviceName} instance:`, instance); } unregister(serviceName: string, instance: ServiceInstance) { const instances = this.services.get(serviceName); if (instances) { const index = instances.indexOf(instance); if (index > -1) { instances.splice(index, 1); console.log(`Unregistered ${serviceName} instance:`, instance); } } } discover(serviceName: string): ServiceInstance | null { const instances = this.services.get(serviceName); if (!instances || instances.length === 0) { return null; } // Round-robin load balancing const index = Math.floor(Math.random() * instances.length); return instances[index]; } } interface ServiceInstance { host: string; port: number; healthCheckUrl: string; } // Register on service startup const serviceRegistry = new ServiceRegistry(); serviceRegistry.register('user-service', { host: 'localhost', port: 3001, healthCheckUrl: 'http://localhost:3001/health', }); serviceRegistry.register('order-service', { host: 'localhost', port: 3002, healthCheckUrl: 'http://localhost:3002/health', });
2. Health Check
typescript// Health check service class HealthCheckService { async checkService(instance: ServiceInstance): Promise<boolean> { try { const response = await fetch(instance.healthCheckUrl); return response.ok; } catch (error) { console.error(`Health check failed for ${instance.host}:${instance.port}:`, error); return false; } } async startHealthChecks(registry: ServiceRegistry) { setInterval(async () => { const services = ['user-service', 'order-service', 'product-service']; for (const serviceName of services) { const instance = registry.discover(serviceName); if (instance) { const isHealthy = await this.checkService(instance); if (!isHealthy) { console.log(`Service ${serviceName} is unhealthy, removing from registry`); registry.unregister(serviceName, instance); } } } }, 30000); // Check every 30 seconds } } // Start health checks const healthCheckService = new HealthCheckService(); healthCheckService.startHealthChecks(serviceRegistry);
Using TypeORM in microservices architecture requires careful design of data management, transaction handling, and communication mechanisms to ensure system reliability and scalability.