乐闻世界logo
搜索文章和话题

NestJS 微服务和架构如何设计?

2月17日 22:33

微服务架构概述

微服务架构是一种将应用程序构建为一组小型、独立服务的方法,每个服务运行在自己的进程中,通过轻量级机制(通常是 HTTP API 或消息队列)进行通信。NestJS 提供了完整的微服务支持,使开发者能够轻松构建可扩展的分布式系统。

NestJS 微服务基础

安装依赖

bash
npm install @nestjs/microservices

创建微服务

typescript
import { NestFactory } from '@nestjs/core'; import { Transport, MicroserviceOptions } from '@nestjs/microservices'; import { AppModule } from './app.module'; async function bootstrap() { const app = await NestFactory.createMicroservice<MicroserviceOptions>( AppModule, { transport: Transport.TCP, options: { host: '127.0.0.1', port: 8877, }, }, ); await app.listen(); } bootstrap();

混合应用(HTTP + 微服务)

typescript
import { NestFactory } from '@nestjs/core'; import { Transport, MicroserviceOptions } from '@nestjs/microservices'; import { AppModule } from './app.module'; async function bootstrap() { const app = await NestFactory.create(AppModule); const microservice = app.connectMicroservice<MicroserviceOptions>({ transport: Transport.TCP, options: { host: '127.0.0.1', port: 8877, }, }); await app.startAllMicroservices(); await app.listen(3000); } bootstrap();

消息模式

1. 消息模式(Message Pattern)

typescript
import { Controller } from '@nestjs/common'; import { EventPattern, Payload, Ctx, RmqContext } from '@nestjs/microservices'; @Controller() export class MathController { @MessagePattern({ cmd: 'sum' }) accumulate(@Payload() data: number[]): number { return (data || []).reduce((a, b) => a + b, 0); } }

2. 事件模式(Event Pattern)

typescript
import { Controller } from '@nestjs/common'; import { EventPattern, Payload } from '@nestjs/microservices'; @Controller() export class NotificationController { @EventPattern('user_created') async handleUserCreated(@Payload() data: Record<string, unknown>) { // 处理用户创建事件 } }

传输层

1. TCP 传输

typescript
{ transport: Transport.TCP, options: { host: '127.0.0.1', port: 8877, }, }

2. Redis 传输

typescript
{ transport: Transport.REDIS, options: { host: 'localhost', port: 6379, }, }

3. NATS 传输

typescript
{ transport: Transport.NATS, options: { url: 'nats://localhost:4222', }, }

4. MQTT 传输

typescript
{ transport: Transport.MQTT, options: { url: 'mqtt://localhost:1883', }, }

5. RabbitMQ 传输

typescript
{ transport: Transport.RMQ, options: { urls: ['amqp://localhost:5672'], queue: 'cats_queue', queueOptions: { durable: false, }, }, }

6. Kafka 传输

typescript
{ transport: Transport.KAFKA, options: { client: { brokers: ['localhost:9092'], }, consumer: { groupId: 'my-consumer', }, }, }

客户端(Client)

创建客户端代理

typescript
import { Controller, Get } from '@nestjs/common'; import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices'; @Controller() export class AppController { private client: ClientProxy; constructor() { this.client = ClientProxyFactory.create({ transport: Transport.TCP, options: { host: '127.0.0.1', port: 8877, }, }); } @Get() async getSum() { return this.client.send({ cmd: 'sum' }, [1, 2, 3, 4, 5]); } }

使用模块配置客户端

typescript
import { Module } from '@nestjs/common'; import { ClientsModule, Transport } from '@nestjs/microservices'; @Module({ imports: [ ClientsModule.register([ { name: 'MATH_SERVICE', transport: Transport.TCP, options: { host: '127.0.0.1', port: 8877, }, }, ]), ], controllers: [AppController], }) export class AppModule {}

注入客户端

typescript
import { Controller, Get, Inject } from '@nestjs/common'; import { ClientProxy } from '@nestjs/microservices'; @Controller() export class AppController { constructor(@Inject('MATH_SERVICE') private client: ClientProxy) {} @Get() async getSum() { return this.client.send({ cmd: 'sum' }, [1, 2, 3, 4, 5]); } }

消息确认

手动确认

typescript
import { Controller } from '@nestjs/common'; import { EventPattern, Payload, Ctx, RmqContext } from '@nestjs/microservices'; @Controller() export class NotificationController { @EventPattern('notification_created') async handleNotificationCreated( @Payload() data: any, @Ctx() context: RmqContext, ) { const channel = context.getChannelRef(); const originalMsg = context.getMessage(); // 处理消息 await this.processNotification(data); // 手动确认消息 channel.ack(originalMsg); } }

配置预取计数

typescript
{ transport: Transport.RMQ, options: { urls: ['amqp://localhost:5672'], queue: 'notifications_queue', prefetchCount: 10, queueOptions: { durable: false, }, }, }

微服务架构模式

1. API 网关模式

typescript
import { Controller, Get, Param, Post, Body } from '@nestjs/common'; import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices'; @Controller('api') export class ApiController { private userService: ClientProxy; private orderService: ClientProxy; constructor() { this.userService = ClientProxyFactory.create({ transport: Transport.TCP, options: { host: '127.0.0.1', port: 8877 }, }); this.orderService = ClientProxyFactory.create({ transport: Transport.TCP, options: { host: '127.0.0.1', port: 8878 }, }); } @Get('users/:id') getUser(@Param('id') id: string) { return this.userService.send({ cmd: 'get_user' }, { id }); } @Post('orders') createOrder(@Body() orderData: any) { return this.orderService.send({ cmd: 'create_order' }, orderData); } }

2. 事件驱动架构

typescript
// 订单服务 @Controller() export class OrderController { @EventPattern('order_created') async handleOrderCreated(@Payload() order: any) { // 处理订单创建事件 await this.sendNotification(order); await this.updateInventory(order); } } // 通知服务 @Controller() export class NotificationController { @EventPattern('order_created') async handleOrderCreated(@Payload() order: any) { // 发送通知 await this.sendEmail(order.userEmail, 'Order created'); } } // 库存服务 @Controller() export class InventoryController { @EventPattern('order_created') async handleOrderCreated(@Payload() order: any) { // 更新库存 await this.reduceStock(order.items); } }

3. CQRS 模式

typescript
// Command Handler @Controller() export class OrderCommandController { @MessagePattern({ cmd: 'create_order' }) async createOrder(@Payload() command: CreateOrderCommand) { return this.commandBus.execute(command); } } // Query Handler @Controller() export class OrderQueryController { @MessagePattern({ cmd: 'get_order' }) async getOrder(@Payload() query: GetOrderQuery) { return this.queryBus.execute(query); } }

服务发现

使用 Consul

typescript
import { Module } from '@nestjs/common'; import { ClientsModule, Transport } from '@nestjs/microservices'; @Module({ imports: [ ClientsModule.register([ { name: 'USER_SERVICE', transport: Transport.TCP, options: { host: 'user-service', port: 3000, }, }, ]), ], }) export class AppModule {}

使用 Kubernetes Service Discovery

typescript
{ transport: Transport.TCP, options: { host: process.env.USER_SERVICE_HOST || 'user-service', port: parseInt(process.env.USER_SERVICE_PORT) || 3000, }, }

分布式追踪

集成 OpenTelemetry

typescript
import { Controller } from '@nestjs/common'; import { MessagePattern, Payload } from '@nestjs/microservices'; import { trace } from '@opentelemetry/api'; @Controller() export class OrderController { @MessagePattern({ cmd: 'create_order' }) async createOrder(@Payload() data: any) { const tracer = trace.getTracer('order-service'); const span = tracer.startSpan('create_order'); try { const result = await this.orderService.create(data); span.end(); return result; } catch (error) { span.recordException(error); span.end(); throw error; } } }

最佳实践

  1. 服务边界:明确定义服务边界和职责
  2. 异步通信:使用异步消息进行服务间通信
  3. 幂等性:确保操作是幂等的,可以安全重试
  4. 错误处理:实现适当的错误处理和重试机制
  5. 监控:实施全面的监控和日志记录
  6. 配置管理:使用配置中心管理服务配置
  7. 版本控制:实现 API 版本控制策略
  8. 测试:编写集成测试和契约测试

总结

NestJS 微服务和架构提供了:

  • 完整的微服务支持
  • 多种传输层选择
  • 灵活的消息模式
  • 强大的客户端代理
  • 易于构建分布式系统

掌握 NestJS 微服务架构是构建可扩展、可维护的企业级应用的关键。通过合理使用微服务模式、事件驱动架构和最佳实践,可以构建出高性能、可靠的分布式系统。微服务架构使团队能够独立开发、部署和扩展各个服务,提高开发效率和系统弹性。

标签:NestJS