微服务架构概述
微服务架构是一种将应用程序构建为一组小型、独立服务的方法,每个服务运行在自己的进程中,通过轻量级机制(通常是 HTTP API 或消息队列)进行通信。NestJS 提供了完整的微服务支持,使开发者能够轻松构建可扩展的分布式系统。
NestJS 微服务基础
安装依赖
bashnpm install @nestjs/microservices
创建微服务
typescriptimport { NestFactory } from '@nestjs/core'; import { Transport, MicroserviceOptions } from '@nestjs/microservices'; import { AppModule } from './app.module'; async function bootstrap() { const app = await NestFactory.createMicroservice<MicroserviceOptions>( AppModule, { transport: Transport.TCP, options: { host: '127.0.0.1', port: 8877, }, }, ); await app.listen(); } bootstrap();
混合应用(HTTP + 微服务)
typescriptimport { NestFactory } from '@nestjs/core'; import { Transport, MicroserviceOptions } from '@nestjs/microservices'; import { AppModule } from './app.module'; async function bootstrap() { const app = await NestFactory.create(AppModule); const microservice = app.connectMicroservice<MicroserviceOptions>({ transport: Transport.TCP, options: { host: '127.0.0.1', port: 8877, }, }); await app.startAllMicroservices(); await app.listen(3000); } bootstrap();
消息模式
1. 消息模式(Message Pattern)
typescriptimport { Controller } from '@nestjs/common'; import { EventPattern, Payload, Ctx, RmqContext } from '@nestjs/microservices'; @Controller() export class MathController { @MessagePattern({ cmd: 'sum' }) accumulate(@Payload() data: number[]): number { return (data || []).reduce((a, b) => a + b, 0); } }
2. 事件模式(Event Pattern)
typescriptimport { Controller } from '@nestjs/common'; import { EventPattern, Payload } from '@nestjs/microservices'; @Controller() export class NotificationController { @EventPattern('user_created') async handleUserCreated(@Payload() data: Record<string, unknown>) { // 处理用户创建事件 } }
传输层
1. TCP 传输
typescript{ transport: Transport.TCP, options: { host: '127.0.0.1', port: 8877, }, }
2. Redis 传输
typescript{ transport: Transport.REDIS, options: { host: 'localhost', port: 6379, }, }
3. NATS 传输
typescript{ transport: Transport.NATS, options: { url: 'nats://localhost:4222', }, }
4. MQTT 传输
typescript{ transport: Transport.MQTT, options: { url: 'mqtt://localhost:1883', }, }
5. RabbitMQ 传输
typescript{ transport: Transport.RMQ, options: { urls: ['amqp://localhost:5672'], queue: 'cats_queue', queueOptions: { durable: false, }, }, }
6. Kafka 传输
typescript{ transport: Transport.KAFKA, options: { client: { brokers: ['localhost:9092'], }, consumer: { groupId: 'my-consumer', }, }, }
客户端(Client)
创建客户端代理
typescriptimport { Controller, Get } from '@nestjs/common'; import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices'; @Controller() export class AppController { private client: ClientProxy; constructor() { this.client = ClientProxyFactory.create({ transport: Transport.TCP, options: { host: '127.0.0.1', port: 8877, }, }); } @Get() async getSum() { return this.client.send({ cmd: 'sum' }, [1, 2, 3, 4, 5]); } }
使用模块配置客户端
typescriptimport { Module } from '@nestjs/common'; import { ClientsModule, Transport } from '@nestjs/microservices'; @Module({ imports: [ ClientsModule.register([ { name: 'MATH_SERVICE', transport: Transport.TCP, options: { host: '127.0.0.1', port: 8877, }, }, ]), ], controllers: [AppController], }) export class AppModule {}
注入客户端
typescriptimport { Controller, Get, Inject } from '@nestjs/common'; import { ClientProxy } from '@nestjs/microservices'; @Controller() export class AppController { constructor(@Inject('MATH_SERVICE') private client: ClientProxy) {} @Get() async getSum() { return this.client.send({ cmd: 'sum' }, [1, 2, 3, 4, 5]); } }
消息确认
手动确认
typescriptimport { Controller } from '@nestjs/common'; import { EventPattern, Payload, Ctx, RmqContext } from '@nestjs/microservices'; @Controller() export class NotificationController { @EventPattern('notification_created') async handleNotificationCreated( @Payload() data: any, @Ctx() context: RmqContext, ) { const channel = context.getChannelRef(); const originalMsg = context.getMessage(); // 处理消息 await this.processNotification(data); // 手动确认消息 channel.ack(originalMsg); } }
配置预取计数
typescript{ transport: Transport.RMQ, options: { urls: ['amqp://localhost:5672'], queue: 'notifications_queue', prefetchCount: 10, queueOptions: { durable: false, }, }, }
微服务架构模式
1. API 网关模式
typescriptimport { Controller, Get, Param, Post, Body } from '@nestjs/common'; import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices'; @Controller('api') export class ApiController { private userService: ClientProxy; private orderService: ClientProxy; constructor() { this.userService = ClientProxyFactory.create({ transport: Transport.TCP, options: { host: '127.0.0.1', port: 8877 }, }); this.orderService = ClientProxyFactory.create({ transport: Transport.TCP, options: { host: '127.0.0.1', port: 8878 }, }); } @Get('users/:id') getUser(@Param('id') id: string) { return this.userService.send({ cmd: 'get_user' }, { id }); } @Post('orders') createOrder(@Body() orderData: any) { return this.orderService.send({ cmd: 'create_order' }, orderData); } }
2. 事件驱动架构
typescript// 订单服务 @Controller() export class OrderController { @EventPattern('order_created') async handleOrderCreated(@Payload() order: any) { // 处理订单创建事件 await this.sendNotification(order); await this.updateInventory(order); } } // 通知服务 @Controller() export class NotificationController { @EventPattern('order_created') async handleOrderCreated(@Payload() order: any) { // 发送通知 await this.sendEmail(order.userEmail, 'Order created'); } } // 库存服务 @Controller() export class InventoryController { @EventPattern('order_created') async handleOrderCreated(@Payload() order: any) { // 更新库存 await this.reduceStock(order.items); } }
3. CQRS 模式
typescript// Command Handler @Controller() export class OrderCommandController { @MessagePattern({ cmd: 'create_order' }) async createOrder(@Payload() command: CreateOrderCommand) { return this.commandBus.execute(command); } } // Query Handler @Controller() export class OrderQueryController { @MessagePattern({ cmd: 'get_order' }) async getOrder(@Payload() query: GetOrderQuery) { return this.queryBus.execute(query); } }
服务发现
使用 Consul
typescriptimport { Module } from '@nestjs/common'; import { ClientsModule, Transport } from '@nestjs/microservices'; @Module({ imports: [ ClientsModule.register([ { name: 'USER_SERVICE', transport: Transport.TCP, options: { host: 'user-service', port: 3000, }, }, ]), ], }) export class AppModule {}
使用 Kubernetes Service Discovery
typescript{ transport: Transport.TCP, options: { host: process.env.USER_SERVICE_HOST || 'user-service', port: parseInt(process.env.USER_SERVICE_PORT) || 3000, }, }
分布式追踪
集成 OpenTelemetry
typescriptimport { Controller } from '@nestjs/common'; import { MessagePattern, Payload } from '@nestjs/microservices'; import { trace } from '@opentelemetry/api'; @Controller() export class OrderController { @MessagePattern({ cmd: 'create_order' }) async createOrder(@Payload() data: any) { const tracer = trace.getTracer('order-service'); const span = tracer.startSpan('create_order'); try { const result = await this.orderService.create(data); span.end(); return result; } catch (error) { span.recordException(error); span.end(); throw error; } } }
最佳实践
- 服务边界:明确定义服务边界和职责
- 异步通信:使用异步消息进行服务间通信
- 幂等性:确保操作是幂等的,可以安全重试
- 错误处理:实现适当的错误处理和重试机制
- 监控:实施全面的监控和日志记录
- 配置管理:使用配置中心管理服务配置
- 版本控制:实现 API 版本控制策略
- 测试:编写集成测试和契约测试
总结
NestJS 微服务和架构提供了:
- 完整的微服务支持
- 多种传输层选择
- 灵活的消息模式
- 强大的客户端代理
- 易于构建分布式系统
掌握 NestJS 微服务架构是构建可扩展、可维护的企业级应用的关键。通过合理使用微服务模式、事件驱动架构和最佳实践,可以构建出高性能、可靠的分布式系统。微服务架构使团队能够独立开发、部署和扩展各个服务,提高开发效率和系统弹性。