面试题手册

梳理高频技术问题,帮助你按主题复习和查漏补缺。

服务端阅读 02月18日 22:20

TypeORM 的事件系统如何工作?包括实体监听器和订阅者

TypeORM 的事件系统允许开发者在实体操作的生命周期中执行自定义逻辑,提供了强大的扩展能力。事件类型1. 实体生命周期事件TypeORM 提供了以下实体生命周期事件:BeforeInsert - 在实体插入之前触发AfterInsert - 在实体插入之后触发BeforeUpdate - 在实体更新之前触发AfterUpdate - 在实体更新之后触发BeforeRemove - 在实体删除之前触发AfterRemove - 在实体删除之后触发BeforeSoftRemove - 在实体软删除之前触发AfterSoftRemove - 在实体软删除之后触发BeforeRecover - 在实体恢复之前触发AfterRecover - 在实体恢复之后触发2. 订阅者事件订阅者可以监听所有实体的特定事件。使用实体监听器基本用法import { Entity, PrimaryGeneratedColumn, Column, BeforeInsert, BeforeUpdate, AfterInsert, AfterUpdate } from 'typeorm';@Entity()export class User { @PrimaryGeneratedColumn() id: number; @Column() name: string; @Column() email: string; @Column({ type: 'timestamp' }) createdAt: Date; @Column({ type: 'timestamp' }) updatedAt: Date; @Column({ default: 0 }) version: number; @BeforeInsert() beforeInsert() { this.createdAt = new Date(); this.updatedAt = new Date(); this.version = 1; } @BeforeUpdate() beforeUpdate() { this.updatedAt = new Date(); this.version++; } @AfterInsert() afterInsert() { console.log(`User ${this.name} inserted with ID ${this.id}`); } @AfterUpdate() afterUpdate() { console.log(`User ${this.name} updated to version ${this.version}`); }}复杂逻辑处理import { Entity, PrimaryGeneratedColumn, Column, BeforeInsert, BeforeUpdate } from 'typeorm';import { hash } from 'bcrypt';@Entity()export class User { @PrimaryGeneratedColumn() id: number; @Column() name: string; @Column() email: string; @Column() password: string; @Column({ default: false }) emailVerified: boolean; @Column({ type: 'timestamp', nullable: true }) emailVerifiedAt: Date; @Column({ type: 'timestamp' }) createdAt: Date; @Column({ type: 'timestamp' }) updatedAt: Date; @BeforeInsert() async beforeInsert() { this.createdAt = new Date(); this.updatedAt = new Date(); // 加密密码 if (this.password) { this.password = await hash(this.password, 10); } // 验证邮箱格式 if (!this.validateEmail(this.email)) { throw new Error('Invalid email format'); } } @BeforeUpdate() async beforeUpdate() { this.updatedAt = new Date(); // 如果密码被修改,重新加密 if (this.password && this.isPasswordModified()) { this.password = await hash(this.password, 10); } // 如果邮箱被验证,记录验证时间 if (this.emailVerified && !this.emailVerifiedAt) { this.emailVerifiedAt = new Date(); } } private validateEmail(email: string): boolean { const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/; return emailRegex.test(email); } private isPasswordModified(): boolean { // 实现密码修改检测逻辑 return true; }}使用订阅者基本订阅者import { EntitySubscriberInterface, EventSubscriber, InsertEvent, UpdateEvent, RemoveEvent } from 'typeorm';import { User } from '../entity/User';@EventSubscriber()export class UserSubscriber implements EntitySubscriberInterface<User> { // 指定监听的实体 listenTo() { return User; } // 插入前 beforeInsert(event: InsertEvent<User>) { console.log(`Before inserting user: ${event.entity.name}`); // 可以修改实体 event.entity.createdAt = new Date(); } // 插入后 afterInsert(event: InsertEvent<User>) { console.log(`After inserting user with ID: ${event.entity.id}`); // 发送欢迎邮件 this.sendWelcomeEmail(event.entity); } // 更新前 beforeUpdate(event: UpdateEvent<User>) { console.log(`Before updating user: ${event.entity.name}`); // 记录变更 this.logChanges(event); } // 更新后 afterUpdate(event: UpdateEvent<User>) { console.log(`After updating user: ${event.entity.name}`); // 发送通知 this.sendUpdateNotification(event.entity); } // 删除前 beforeRemove(event: RemoveEvent<User>) { console.log(`Before removing user: ${event.entity.name}`); // 检查是否可以删除 if (event.entity.posts && event.entity.posts.length > 0) { throw new Error('Cannot delete user with posts'); } } // 删除后 afterRemove(event: RemoveEvent<User>) { console.log(`After removing user: ${event.entity.name}`); // 清理相关数据 this.cleanupUserData(event.entity.id); } private sendWelcomeEmail(user: User) { // 发送欢迎邮件逻辑 console.log(`Sending welcome email to ${user.email}`); } private sendUpdateNotification(user: User) { // 发送更新通知逻辑 console.log(`Sending update notification to ${user.email}`); } private logChanges(event: UpdateEvent<User>) { // 记录变更逻辑 console.log('Changes:', event.updatedColumns); } private cleanupUserData(userId: number) { // 清理用户数据逻辑 console.log(`Cleaning up data for user ${userId}`); }}全局订阅者import { EntitySubscriberInterface, EventSubscriber, InsertEvent } from 'typeorm';@EventSubscriber()export class AuditSubscriber implements EntitySubscriberInterface { // 监听所有实体 listenTo() { return Object; } // 所有实体的插入操作 afterInsert(event: InsertEvent<any>) { console.log(`Entity ${event.metadata.name} inserted with ID ${event.entity.id}`); // 记录审计日志 this.logAudit({ action: 'INSERT', entity: event.metadata.name, entityId: event.entity.id, timestamp: new Date(), }); } private logAudit(log: any) { // 记录审计日志逻辑 console.log('Audit log:', log); }}注册订阅者在 DataSource 中注册import { DataSource } from 'typeorm';import { UserSubscriber } from './subscriber/UserSubscriber';import { AuditSubscriber } from './subscriber/AuditSubscriber';const dataSource = new DataSource({ type: 'mysql', host: 'localhost', port: 3306, username: 'root', password: 'password', database: 'myapp', entities: [User, Post], synchronize: false, logging: true, // 注册订阅者 subscribers: [UserSubscriber, AuditSubscriber],});动态注册订阅者import { DataSource } from 'typeorm';const dataSource = new DataSource({ type: 'mysql', host: 'localhost', port: 3306, username: 'root', password: 'password', database: 'myapp', entities: [User, Post], synchronize: false, logging: true,});// 初始化后动态注册订阅者dataSource.initialize().then(() => { const userSubscriber = new UserSubscriber(); dataSource.subscribers.push(userSubscriber);});高级事件处理事务中的事件@EventSubscriber()export class TransactionSubscriber implements EntitySubscriberInterface<User> { listenTo() { return User; } afterInsert(event: InsertEvent<User>) { // 检查是否在事务中 if (event.queryRunner?.isTransactionActive) { console.log('Insert operation is part of a transaction'); } // 使用事务执行器 if (event.queryRunner) { event.queryRunner.manager.getRepository(AuditLog).save({ action: 'USER_INSERT', userId: event.entity.id, timestamp: new Date(), }); } }}异步事件处理@EventSubscriber()export class AsyncSubscriber implements EntitySubscriberInterface<User> { listenTo() { return User; } async afterInsert(event: InsertEvent<User>) { // 异步发送邮件 await this.sendEmailAsync(event.entity); // 异步生成用户资料 await this.generateUserProfileAsync(event.entity); } private async sendEmailAsync(user: User) { // 模拟异步邮件发送 return new Promise((resolve) => { setTimeout(() => { console.log(`Email sent to ${user.email}`); resolve(null); }, 1000); }); } private async generateUserProfileAsync(user: User) { // 模拟异步用户资料生成 return new Promise((resolve) => { setTimeout(() => { console.log(`Profile generated for user ${user.id}`); resolve(null); }, 500); }); }}条件事件处理@EventSubscriber()export class ConditionalSubscriber implements EntitySubscriberInterface<User> { listenTo() { return User; } beforeUpdate(event: UpdateEvent<User>) { // 只在特定条件下执行 if (this.shouldProcessUpdate(event)) { this.processUpdate(event); } } private shouldProcessUpdate(event: UpdateEvent<User>): boolean { // 检查是否更新了特定字段 const updatedFields = event.updatedColumns.map(col => col.propertyName); return updatedFields.includes('email') || updatedFields.includes('password'); } private processUpdate(event: UpdateEvent<User>) { // 处理更新逻辑 console.log('Processing critical update:', event.entity); }}事件最佳实践1. 保持事件处理简单// ✅ 好的做法:事件处理简单直接@EventSubscriber()export class SimpleSubscriber implements EntitySubscriberInterface<User> { listenTo() { return User; } afterInsert(event: InsertEvent<User>) { // 简单的日志记录 console.log(`User created: ${event.entity.name}`); }}// ❌ 不好的做法:事件处理过于复杂@EventSubscriber()export class ComplexSubscriber implements EntitySubscriberInterface<User> { listenTo() { return User; } async afterInsert(event: InsertEvent<User>) { // 复杂的业务逻辑 const user = event.entity; // 发送邮件 await this.sendEmail(user); // 创建用户资料 await this.createProfile(user); // 初始化用户设置 await this.initializeSettings(user); // 发送欢迎消息 await this.sendWelcomeMessage(user); // 记录统计 await this.recordStatistics(user); // 更新缓存 await this.updateCache(user); // 触发其他事件 await this.triggerEvents(user); }}2. 避免循环事件// ✅ 好的做法:避免循环事件@EventSubscriber()export class SafeSubscriber implements EntitySubscriberInterface<User> { listenTo() { return User; } async afterInsert(event: InsertEvent<User>) { // 使用标志位避免循环 if (event.entity.processed) { return; } // 处理逻辑 await this.processUser(event.entity); // 标记为已处理 event.entity.processed = true; }}// ❌ 不好的做法:可能导致循环事件@EventSubscriber()export class CircularSubscriber implements EntitySubscriberInterface<User> { listenTo() { return User; } async afterInsert(event: InsertEvent<User>) { // 更新用户,可能触发 afterUpdate 事件 await event.manager.save(User, { id: event.entity.id, processed: true, }); }}3. 错误处理// ✅ 好的做法:适当的错误处理@EventSubscriber()export class ErrorHandlingSubscriber implements EntitySubscriberInterface<User> { listenTo() { return User; } async afterInsert(event: InsertEvent<User>) { try { await this.sendWelcomeEmail(event.entity); } catch (error) { console.error('Failed to send welcome email:', error); // 记录错误,但不影响主流程 await this.logError(error, event.entity); } } private async logError(error: any, user: User) { // 记录错误到数据库 await event.manager.getRepository(ErrorLog).save({ error: error.message, userId: user.id, timestamp: new Date(), }); }}4. 性能考虑// ✅ 好的做法:批量处理@EventSubscriber()export class BatchSubscriber implements EntitySubscriberInterface<User> { private batch: User[] = []; private timer: NodeJS.Timeout | null = null; listenTo() { return User; } afterInsert(event: InsertEvent<User>) { // 添加到批次 this.batch.push(event.entity); // 设置定时器 if (!this.timer) { this.timer = setTimeout(() => { this.processBatch(); }, 1000); // 1 秒后处理 } } private async processBatch() { if (this.batch.length === 0) { return; } const usersToProcess = [...this.batch]; this.batch = []; this.timer = null; // 批量处理 await this.sendBatchNotifications(usersToProcess); } private async sendBatchNotifications(users: User[]) { console.log(`Sending notifications to ${users.length} users`); // 批量发送通知逻辑 }}实际应用场景1. 审计日志@EventSubscriber()export class AuditLogSubscriber implements EntitySubscriberInterface { listenTo() { return Object; } afterInsert(event: InsertEvent<any>) { this.logAudit('INSERT', event.entity); } afterUpdate(event: UpdateEvent<any>) { this.logAudit('UPDATE', event.entity, event.updatedColumns); } afterRemove(event: RemoveEvent<any>) { this.logAudit('DELETE', event.entity); } private async logAudit(action: string, entity: any, columns?: any[]) { const auditLog = { action, entityName: entity.constructor.name, entityId: entity.id, changes: columns ? columns.map(col => col.propertyName) : null, timestamp: new Date(), }; await event.manager.getRepository(AuditLog).save(auditLog); }}2. 缓存失效@EventSubscriber()export class CacheInvalidationSubscriber implements EntitySubscriberInterface<User> { listenTo() { return User; } afterUpdate(event: UpdateEvent<User>) { // 清除用户缓存 this.clearUserCache(event.entity.id); // 清除相关缓存 this.clearRelatedCache(event.entity.id); } afterRemove(event: RemoveEvent<User>) { // 清除所有相关缓存 this.clearAllUserCache(event.entity.id); } private clearUserCache(userId: number) { // 清除用户缓存逻辑 console.log(`Clearing cache for user ${userId}`); } private clearRelatedCache(userId: number) { // 清除相关缓存逻辑 console.log(`Clearing related cache for user ${userId}`); } private clearAllUserCache(userId: number) { // 清除所有用户缓存逻辑 console.log(`Clearing all cache for user ${userId}`); }}3. 通知系统@EventSubscriber()export class NotificationSubscriber implements EntitySubscriberInterface<Post> { listenTo() { return Post; } afterInsert(event: InsertEvent<Post>) { // 通知关注者 this.notifyFollowers(event.entity); // 通知作者 this.notifyAuthor(event.entity); } afterUpdate(event: UpdateEvent<Post>) { // 如果文章被发布,通知关注者 if (this.isPublished(event)) { this.notifyFollowers(event.entity); } } private isPublished(event: UpdateEvent<Post>): boolean { const updatedFields = event.updatedColumns.map(col => col.propertyName); return updatedFields.includes('status') && event.entity.status === 'published'; } private async notifyFollowers(post: Post) { // 通知关注者逻辑 console.log(`Notifying followers of post ${post.id}`); } private async notifyAuthor(post: Post) { // 通知作者逻辑 console.log(`Notifying author of post ${post.id}`); }}TypeORM 的事件系统提供了强大的扩展能力,合理使用事件可以简化业务逻辑,提高代码的可维护性。
服务端阅读 02月18日 22:20

TypeORM 中如何定义和使用关系映射?包括一对一、一对多、多对多关系的详细配置

TypeORM 提供了四种主要的关系映射类型,每种关系都有其特定的使用场景和配置方式。理解这些关系映射对于构建复杂的数据模型至关重要。四种关系类型1. One-to-One (一对一)一对一关系表示两个实体之间存在唯一的对应关系。例如,一个用户只能有一个个人资料。@Entity()export class Profile { @PrimaryGeneratedColumn() id: number; @Column() gender: string; @Column() bio: string; @OneToOne(() => User, user => user.profile) @JoinColumn() user: User;}@Entity()export class User { @PrimaryGeneratedColumn() id: number; @Column() name: string; @OneToOne(() => Profile, profile => profile.user, { cascade: true }) profile: Profile;}关键点:使用 @OneToOne() 装饰器定义关系在拥有方使用 @JoinColumn() 指定外键列cascade: true 允许级联操作(保存、删除等)2. One-to-Many / Many-to-One (一对多/多对一)一对多关系是最常见的关系类型。例如,一个用户可以发表多篇文章。@Entity()export class Post { @PrimaryGeneratedColumn() id: number; @Column() title: string; @Column() content: string; @ManyToOne(() => User, user => user.posts) author: User;}@Entity()export class User { @PrimaryGeneratedColumn() id: number; @Column() name: string; @OneToMany(() => Post, post => post.author) posts: Post[];}关键点:@ManyToOne() 放在"多"的一方,包含外键@OneToMany() 放在"一"的一方,不需要 @JoinColumn()外键自动添加到"多"的一方表中3. Many-to-Many (多对多)多对多关系需要中间表来连接两个实体。例如,一篇文章可以有多个标签,一个标签也可以属于多篇文章。@Entity()export class Tag { @PrimaryGeneratedColumn() id: number; @Column() name: string; @ManyToMany(() => Post, post => post.tags) posts: Post[];}@Entity()export class Post { @PrimaryGeneratedColumn() id: number; @Column() title: string; @ManyToMany(() => Tag, tag => tag.posts, { cascade: true }) @JoinTable() tags: Tag[];}关键点:使用 @JoinTable() 在关系的一方定义中间表中间表自动创建,包含两个外键可以自定义中间表名称和列名关系配置选项Eager 和 Lazy 加载@Entity()export class User { @OneToMany(() => Post, post => post.author, { eager: true // 立即加载关联数据 }) posts: Post[];}// 或者使用懒加载@Entity()export class User { @OneToMany(() => Post, post => post.author) posts: Promise<Post[]>;}级联操作 (Cascade)@OneToMany(() => Post, post => post.author, { cascade: ['insert', 'update', 'remove', 'soft-remove', 'recover']})posts: Post[];级联操作选项:insert: 保存父实体时自动保存子实体update: 更新父实体时自动更新子实体remove: 删除父实体时自动删除子实体soft-remove: 软删除recover: 恢复软删除的实体OnDelete 和 OnUpdate@ManyToOne(() => User, user => user.posts, { onDelete: 'CASCADE', // 删除用户时级联删除文章 onUpdate: 'CASCADE' // 更新用户ID时级联更新文章})author: User;关系查询使用 FindOptions 查询关联数据const userRepository = dataSource.getRepository(User);// 加载关联数据const users = await userRepository.find({ relations: ['posts', 'profile']});// 条件查询关联数据const usersWithPosts = await userRepository.find({ relations: { posts: true }, where: { posts: { title: Like('%TypeORM%') } }});使用 QueryBuilderconst users = await dataSource .getRepository(User) .createQueryBuilder('user') .leftJoinAndSelect('user.posts', 'post') .leftJoinAndSelect('user.profile', 'profile') .where('post.title = :title', { title: 'TypeORM Guide' }) .getMany();自定义关系自定义 JoinTable@ManyToMany(() => Tag, tag => tag.posts)@JoinTable({ name: 'post_tags', joinColumn: { name: 'postId', referencedColumnName: 'id' }, inverseJoinColumn: { name: 'tagId', referencedColumnName: 'id' }})tags: Tag[];自定义 JoinColumn@ManyToOne(() => User, user => user.posts)@JoinColumn({ name: 'author_id', referencedColumnName: 'id'})author: User;最佳实践合理选择关系类型: 根据业务需求选择最合适的关系类型避免过度使用 Eager 加载: 可能导致 N+1 查询问题谨慎使用级联删除: 确保不会意外删除重要数据使用索引优化查询: 为外键列添加索引考虑性能影响: 复杂关系查询可能影响性能TypeORM 的关系映射系统提供了强大而灵活的方式来处理实体之间的关系,掌握这些概念对于构建高效、可维护的应用程序至关重要。
服务端阅读 02月18日 22:19

TypeORM 的 QueryBuilder 如何使用?包括复杂查询、关联查询、分页排序等高级功能

QueryBuilder 是 TypeORM 中最强大、最灵活的查询工具,它允许开发者构建复杂的 SQL 查询,同时保持类型安全和可读性。QueryBuilder 基础用法创建 QueryBuilderimport { DataSource } from 'typeorm';const dataSource = new DataSource(/* 配置 */);// 方式1: 通过 Repository 创建const userRepository = dataSource.getRepository(User);const queryBuilder = userRepository.createQueryBuilder('user');// 方式2: 通过 DataSource 创建const queryBuilder = dataSource.createQueryBuilder(User, 'user');基本查询操作// 查询所有用户const users = await dataSource .createQueryBuilder('user') .getMany();// 查询单个用户const user = await dataSource .createQueryBuilder('user') .where('user.id = :id', { id: 1 }) .getOne();// 查询并计数const count = await dataSource .createQueryBuilder('user') .getCount();条件查询Where 子句// 简单条件const users = await dataSource .createQueryBuilder('user') .where('user.age > :age', { age: 18 }) .getMany();// 多个条件 (AND)const users = await dataSource .createQueryBuilder('user') .where('user.age > :age', { age: 18 }) .andWhere('user.isActive = :isActive', { isActive: true }) .getMany();// OR 条件const users = await dataSource .createQueryBuilder('user') .where('user.age > :age', { age: 18 }) .orWhere('user.role = :role', { role: 'admin' }) .getMany();// 复杂条件组合const users = await dataSource .createQueryBuilder('user') .where( new Brackets(qb => { qb.where('user.age > :age', { age: 18 }) .orWhere('user.role = :role', { role: 'admin' }); }) ) .andWhere('user.isActive = :isActive', { isActive: true }) .getMany();操作符import { Like, Between, In, MoreThan, LessThan } from 'typeorm';// LIKE 查询const users = await dataSource .createQueryBuilder('user') .where('user.name LIKE :name', { name: '%John%' }) .getMany();// 或者使用 Like 操作符const users = await dataSource .createQueryBuilder('user') .where('user.name = :name', { name: Like('%John%') }) .getMany();// BETWEEN 查询const users = await dataSource .createQueryBuilder('user') .where('user.age = :age', { age: Between(18, 30) }) .getMany();// IN 查询const users = await dataSource .createQueryBuilder('user') .where('user.id IN :ids', { ids: [1, 2, 3] }) .getMany();// 或者使用 In 操作符const users = await dataSource .createQueryBuilder('user') .where('user.id = :ids', { ids: In([1, 2, 3]) }) .getMany();// 比较操作符const users = await dataSource .createQueryBuilder('user') .where('user.age = :age', { age: MoreThan(18) }) .andWhere('user.score = :score', { score: LessThan(100) }) .getMany();关联查询Left Join 和 Inner Join// Left Join (包含没有关联数据的记录)const users = await dataSource .createQueryBuilder('user') .leftJoinAndSelect('user.posts', 'post') .where('user.id = :id', { id: 1 }) .getMany();// Inner Join (只包含有关联数据的记录)const users = await dataSource .createQueryBuilder('user') .innerJoinAndSelect('user.posts', 'post') .where('user.id = :id', { id: 1 }) .getMany();// 多层关联const users = await dataSource .createQueryBuilder('user') .leftJoinAndSelect('user.posts', 'post') .leftJoinAndSelect('post.comments', 'comment') .leftJoinAndSelect('comment.author', 'commentAuthor') .getMany();Join 条件const users = await dataSource .createQueryBuilder('user') .leftJoin('user.posts', 'post', 'post.status = :status', { status: 'published' }) .addSelect(['post.title', 'post.createdAt']) .getMany();排序和分页排序// 单字段排序const users = await dataSource .createQueryBuilder('user') .orderBy('user.createdAt', 'DESC') .getMany();// 多字段排序const users = await dataSource .createQueryBuilder('user') .orderBy('user.createdAt', 'DESC') .addOrderBy('user.name', 'ASC') .getMany();// 随机排序 (MySQL)const users = await dataSource .createQueryBuilder('user') .orderBy('RAND()') .getMany();分页// 基本分页const page = 1;const pageSize = 10;const users = await dataSource .createQueryBuilder('user') .skip((page - 1) * pageSize) .take(pageSize) .getMany();// 获取总数和分页数据const [users, total] = await dataSource .createQueryBuilder('user') .skip((page - 1) * pageSize) .take(pageSize) .getManyAndCount();console.log(`Total: ${total}, Page: ${page}, PageSize: ${pageSize}`);聚合查询Group By 和 Having// 按角色分组统计用户数const result = await dataSource .createQueryBuilder('user') .select('user.role', 'role') .addSelect('COUNT(*)', 'count') .groupBy('user.role') .getRawMany();// 使用 Having 过滤分组const result = await dataSource .createQueryBuilder('user') .select('user.role', 'role') .addSelect('COUNT(*)', 'count') .groupBy('user.role') .having('COUNT(*) > :minCount', { minCount: 5 }) .getRawMany();聚合函数// 统计总数const count = await dataSource .createQueryBuilder('user') .select('COUNT(*)', 'count') .getRawOne();// 计算平均值const avgAge = await dataSource .createQueryBuilder('user') .select('AVG(user.age)', 'avgAge') .getRawOne();// 求和const totalScore = await dataSource .createQueryBuilder('user') .select('SUM(user.score)', 'totalScore') .getRawOne();// 最大值和最小值const result = await dataSource .createQueryBuilder('user') .select('MAX(user.age)', 'maxAge') .addSelect('MIN(user.age)', 'minAge') .getRawOne();子查询使用 SubQueryFactoryimport { SubQueryFactory } from 'typeorm';const users = await dataSource .createQueryBuilder('user') .where((qb: SelectQueryBuilder<User>) => { const subQuery = qb .subQuery() .select('post.userId') .from(Post, 'post') .where('post.title LIKE :title', { title: '%TypeORM%' }) .getQuery(); return 'user.id IN ' + subQuery; }) .setParameter('title', '%TypeORM%') .getMany();使用 EXISTSconst users = await dataSource .createQueryBuilder('user') .where((qb: SelectQueryBuilder<User>) => { const subQuery = qb .subQuery() .select('1') .from(Post, 'post') .where('post.userId = user.id') .getQuery(); return 'EXISTS ' + subQuery; }) .getMany();更新和删除更新操作// 简单更新await dataSource .createQueryBuilder(User, 'user') .update() .set({ name: 'Updated Name' }) .where('id = :id', { id: 1 }) .execute();// 条件更新await dataSource .createQueryBuilder(User, 'user') .update() .set({ isActive: false }) .where('user.lastLoginAt < :date', { date: new Date('2024-01-01') }) .execute();// 基于子查询的更新await dataSource .createQueryBuilder(User, 'user') .update() .set({ score: () => 'score + 10' }) .where('user.id IN :ids', { ids: [1, 2, 3] }) .execute();删除操作// 简单删除await dataSource .createQueryBuilder(User, 'user') .delete() .where('id = :id', { id: 1 }) .execute();// 条件删除await dataSource .createQueryBuilder(User, 'user') .delete() .where('user.createdAt < :date', { date: new Date('2023-01-01') }) .andWhere('user.isActive = :isActive', { isActive: false }) .execute();高级特性原生 SQLconst users = await dataSource .createQueryBuilder(User, 'user') .where('user.id = :id', { id: 1 }) .andWhere('JSON_CONTAINS(user.preferences, :preferences)', { preferences: JSON.stringify({ theme: 'dark' }) }) .getMany();缓存const users = await dataSource .createQueryBuilder('user') .where('user.isActive = :isActive', { isActive: true }) .cache(60000) // 缓存 60 秒 .getMany();事务await dataSource.transaction(async transactionalEntityManager => { const queryRunner = transactionalEntityManager.queryRunner; await queryRunner.manager .createQueryBuilder(User, 'user') .insert() .values({ name: 'John', email: 'john@example.com' }) .execute(); await queryRunner.manager .createQueryBuilder(Post, 'post') .insert() .values({ title: 'New Post', authorId: 1 }) .execute();});性能优化建议避免 N+1 查询: 使用 leftJoinAndSelect 一次性加载关联数据只选择需要的字段: 使用 select() 明确指定需要的列合理使用索引: 为常用查询条件添加数据库索引使用缓存: 对不常变化的数据启用查询缓存限制返回结果: 使用 take() 和 skip() 实现分页监控查询性能: 使用 getQuery() 和 getSql() 查看生成的 SQLQueryBuilder 是 TypeORM 中最强大的查询工具,掌握它的使用可以让你构建出高效、灵活的数据库查询。
服务端阅读 02月18日 22:19

TypeORM 的核心概念是什么?包括 Entity、Repository、DataSource 等主要组件的详细说明

TypeORM 是一个基于 TypeScript 和 JavaScript 的 ORM 框架,它采用 Active Record 和 Data Mapper 两种设计模式,让开发者能够使用面向对象的方式来操作关系型数据库。核心概念1. Entity (实体)Entity 是 TypeORM 的核心概念,它对应数据库中的表。每个 Entity 类都使用 @Entity() 装饰器标记,并映射到数据库表。@Entity()export class User { @PrimaryGeneratedColumn() id: number; @Column() name: string; @Column({ unique: true }) email: string; @CreateDateColumn() createdAt: Date; @UpdateDateColumn() updatedAt: Date;}2. Column (列)Column 装饰器用于定义实体属性如何映射到数据库列:@Column(): 基本列定义@PrimaryGeneratedColumn(): 自增主键@CreateDateColumn(): 自动创建时间@UpdateDateColumn(): 自动更新时间@Generated(): 自动生成值3. Repository (仓储)Repository 是用于操作实体的主要接口,提供了 CRUD 操作:const userRepository = dataSource.getRepository(User);// 创建const user = userRepository.create({ name: 'John', email: 'john@example.com' });await userRepository.save(user);// 查询const users = await userRepository.find();const user = await userRepository.findOne({ where: { id: 1 } });// 更新await userRepository.update(1, { name: 'John Updated' });// 删除await userRepository.delete(1);4. DataSource (数据源)DataSource 是数据库连接的配置和管理中心:const dataSource = new DataSource({ type: 'mysql', host: 'localhost', port: 3306, username: 'root', password: 'password', database: 'test', entities: [User], synchronize: true,});5. Relation (关系)TypeORM 支持四种主要的关系类型:One-to-One (一对一): 使用 @OneToOne() 装饰器One-to-Many (一对多): 使用 @OneToMany() 装饰器Many-to-One (多对一): 使用 @ManyToOne() 装饰器Many-to-Many (多对多): 使用 @ManyToMany() 装饰器@Entity()export class Profile { @OneToOne(() => User, user => user.profile) user: User;}@Entity()export class User { @OneToOne(() => Profile, profile => profile.user) profile: Profile; @OneToMany(() => Post, post => post.author) posts: Post[];}设计模式Active Record 模式在 Active Record 模式中,实体本身包含业务逻辑和数据访问方法:@Entity()export class User extends BaseEntity { @PrimaryGeneratedColumn() id: number; @Column() name: string; static async findByName(name: string) { return this.find({ where: { name } }); }}// 使用const users = await User.findByName('John');Data Mapper 模式在 Data Mapper 模式中,数据访问逻辑与实体分离,通过 Repository 操作:@Entity()export class User { @PrimaryGeneratedColumn() id: number; @Column() name: string;}// 使用 Repositoryconst userRepository = dataSource.getRepository(User);const users = await userRepository.find({ where: { name: 'John' } });关键特性类型安全: 完全支持 TypeScript,提供编译时类型检查装饰器驱动: 使用装饰器简化配置迁移系统: 支持数据库迁移和版本控制查询构建器: 提供灵活的查询构建 API事务支持: 完整的事务管理缓存支持: 内置查询缓存机制多数据库支持: MySQL, PostgreSQL, SQLite, SQL Server, Oracle 等TypeORM 的这些核心概念使其成为 Node.js 生态中最流行的 ORM 框架之一,特别适合需要类型安全和面向对象编程风格的项目。
服务端阅读 02月18日 22:18

TypeORM 如何在微服务架构中使用?包括数据一致性、分布式事务和服务间通信

在微服务架构中使用 TypeORM 需要考虑数据一致性、服务间通信、分布式事务等复杂问题。微服务中的数据管理1. 数据库分离策略// 用户服务 - user-service// config/database.tsimport { DataSource } from 'typeorm';export const userDataSource = new DataSource({ type: 'postgres', host: process.env.USER_DB_HOST || 'localhost', port: parseInt(process.env.USER_DB_PORT || '5432'), username: process.env.USER_DB_USER || 'user_service', password: process.env.USER_DB_PASSWORD || 'password', database: process.env.USER_DB_NAME || 'user_db', entities: [User, UserProfile, UserSettings], synchronize: false, logging: true,});// 订单服务 - order-serviceexport const orderDataSource = new DataSource({ type: 'postgres', host: process.env.ORDER_DB_HOST || 'localhost', port: parseInt(process.env.ORDER_DB_PORT || '5432'), username: process.env.ORDER_DB_USER || 'order_service', password: process.env.ORDER_DB_PASSWORD || 'password', database: process.env.ORDER_DB_NAME || 'order_db', entities: [Order, OrderItem, Payment], synchronize: false, logging: true,});// 产品服务 - product-serviceexport const productDataSource = new DataSource({ type: 'postgres', host: process.env.PRODUCT_DB_HOST || 'localhost', port: parseInt(process.env.PRODUCT_DB_PORT || '5432'), username: process.env.PRODUCT_DB_USER || 'product_service', password: process.env.PRODUCT_DB_PASSWORD || 'password', database: process.env.PRODUCT_DB_NAME || 'product_db', entities: [Product, Category, Inventory], synchronize: false, logging: true,});2. 服务间数据同步// 用户服务 - 同步用户数据到其他服务import { EventEmitter } from 'events';class UserSyncService extends EventEmitter { async syncUserCreated(user: User) { // 发送用户创建事件 this.emit('user.created', { userId: user.id, email: user.email, name: user.name, timestamp: new Date(), }); } async syncUserUpdated(user: User) { // 发送用户更新事件 this.emit('user.updated', { userId: user.id, email: user.email, name: user.name, timestamp: new Date(), }); } async syncUserDeleted(userId: number) { // 发送用户删除事件 this.emit('user.deleted', { userId, timestamp: new Date(), }); }}// 在用户服务中使用@EventSubscriber()export class UserSubscriber implements EntitySubscriberInterface<User> { private syncService: UserSyncService; constructor() { this.syncService = new UserSyncService(); } listenTo() { return User; } afterInsert(event: InsertEvent<User>) { this.syncService.syncUserCreated(event.entity); } afterUpdate(event: UpdateEvent<User>) { this.syncService.syncUserUpdated(event.entity); } afterRemove(event: RemoveEvent<User>) { this.syncService.syncUserDeleted(event.entity.id); }}分布式事务处理1. Saga 模式实现// 订单创建 Sagaclass OrderCreationSaga { private steps: SagaStep[] = []; constructor( private orderDataSource: DataSource, private userDataSource: DataSource, private productDataSource: DataSource ) { this.setupSteps(); } private setupSteps() { // 步骤 1: 验证用户 this.steps.push({ name: 'validateUser', execute: async (data: any) => { const userRepo = this.userDataSource.getRepository(User); const user = await userRepo.findOne({ where: { id: data.userId }, }); if (!user) { throw new Error('User not found'); } return { ...data, user }; }, compensate: async (data: any) => { // 验证步骤无需补偿 }, }); // 步骤 2: 检查库存 this.steps.push({ name: 'checkInventory', execute: async (data: any) => { const inventoryRepo = this.productDataSource.getRepository(Inventory); for (const item of data.items) { const inventory = await inventoryRepo.findOne({ where: { productId: item.productId }, }); if (!inventory || inventory.quantity < item.quantity) { throw new Error(`Insufficient inventory for product ${item.productId}`); } } return data; }, compensate: async (data: any) => { // 检查库存步骤无需补偿 }, }); // 步骤 3: 扣减库存 this.steps.push({ name: 'reserveInventory', execute: async (data: any) => { const inventoryRepo = this.productDataSource.getRepository(Inventory); for (const item of data.items) { await inventoryRepo.decrement( { productId: item.productId }, 'quantity', item.quantity ); } return data; }, compensate: async (data: any) => { // 补偿:恢复库存 const inventoryRepo = this.productDataSource.getRepository(Inventory); for (const item of data.items) { await inventoryRepo.increment( { productId: item.productId }, 'quantity', item.quantity ); } }, }); // 步骤 4: 创建订单 this.steps.push({ name: 'createOrder', execute: async (data: any) => { const orderRepo = this.orderDataSource.getRepository(Order); const order = orderRepo.create({ userId: data.userId, items: data.items, totalAmount: data.totalAmount, status: 'pending', }); const savedOrder = await orderRepo.save(order); return { ...data, orderId: savedOrder.id }; }, compensate: async (data: any) => { // 补偿:删除订单 const orderRepo = this.orderDataSource.getRepository(Order); await orderRepo.delete(data.orderId); }, }); // 步骤 5: 处理支付 this.steps.push({ name: 'processPayment', execute: async (data: any) => { const paymentRepo = this.orderDataSource.getRepository(Payment); const payment = paymentRepo.create({ orderId: data.orderId, amount: data.totalAmount, status: 'processing', }); await paymentRepo.save(payment); // 模拟支付处理 await this.processPaymentAsync(payment); return data; }, compensate: async (data: any) => { // 补偿:取消支付 const paymentRepo = this.orderDataSource.getRepository(Payment); await paymentRepo.update( { orderId: data.orderId }, { status: 'cancelled' } ); }, }); } async execute(data: any): Promise<any> { const executedSteps: SagaStep[] = []; try { for (const step of this.steps) { console.log(`Executing step: ${step.name}`); data = await step.execute(data); executedSteps.push(step); } // 所有步骤成功,更新订单状态 await this.orderDataSource.getRepository(Order).update( { id: data.orderId }, { status: 'completed' } ); return data; } catch (error) { console.error('Saga failed, compensating...', error); // 执行补偿操作 for (let i = executedSteps.length - 1; i >= 0; i--) { const step = executedSteps[i]; console.log(`Compensating step: ${step.name}`); try { await step.compensate(data); } catch (compensationError) { console.error(`Compensation failed for step ${step.name}:`, compensationError); } } throw error; } } private async processPaymentAsync(payment: Payment): Promise<void> { // 模拟支付处理 return new Promise((resolve) => { setTimeout(() => { resolve(); }, 1000); }); }}interface SagaStep { name: string; execute: (data: any) => Promise<any>; compensate: (data: any) => Promise<void>;}2. 两阶段提交 (2PC)// 两阶段提交协调器class TwoPhaseCommitCoordinator { private participants: TwoPhaseCommitParticipant[] = []; registerParticipant(participant: TwoPhaseCommitParticipant) { this.participants.push(participant); } async execute(): Promise<void> { // 阶段 1: 准备 console.log('Phase 1: Prepare'); for (const participant of this.participants) { await participant.prepare(); } // 阶段 2: 提交 console.log('Phase 2: Commit'); for (const participant of this.participants) { await participant.commit(); } } async rollback(): Promise<void> { console.log('Rolling back'); for (const participant of this.participants) { await participant.rollback(); } }}interface TwoPhaseCommitParticipant { prepare(): Promise<void>; commit(): Promise<void>; rollback(): Promise<void>;}// 用户服务参与者class UserServiceParticipant implements TwoPhaseCommitParticipant { constructor(private dataSource: DataSource) {} async prepare(): Promise<void> { console.log('UserService: Preparing...'); // 准备用户数据 } async commit(): Promise<void> { console.log('UserService: Committing...'); // 提交用户数据 } async rollback(): Promise<void> { console.log('UserService: Rolling back...'); // 回滚用户数据 }}// 订单服务参与者class OrderServiceParticipant implements TwoPhaseCommitParticipant { constructor(private dataSource: DataSource) {} async prepare(): Promise<void> { console.log('OrderService: Preparing...'); // 准备订单数据 } async commit(): Promise<void> { console.log('OrderService: Committing...'); // 提交订单数据 } async rollback(): Promise<void> { console.log('OrderService: Rolling back...'); // 回滚订单数据 }}事件驱动架构1. 事件总线实现// 事件总线class EventBus { private eventQueue: any[] = []; private subscribers: Map<string, Function[]> = new Map(); publish(event: any) { console.log(`Publishing event: ${event.type}`); this.eventQueue.push(event); this.processEvent(event); } subscribe(eventType: string, handler: Function) { if (!this.subscribers.has(eventType)) { this.subscribers.set(eventType, []); } this.subscribers.get(eventType)!.push(handler); } private processEvent(event: any) { const handlers = this.subscribers.get(event.type); if (handlers) { handlers.forEach(handler => { try { handler(event); } catch (error) { console.error(`Error processing event ${event.type}:`, error); } }); } }}// 全局事件总线实例export const eventBus = new EventBus();// 在用户服务中发布事件@EventSubscriber()export class UserEventPublisher implements EntitySubscriberInterface<User> { listenTo() { return User; } afterInsert(event: InsertEvent<User>) { eventBus.publish({ type: 'user.created', payload: { userId: event.entity.id, email: event.entity.email, name: event.entity.name, timestamp: new Date(), }, }); } afterUpdate(event: UpdateEvent<User>) { eventBus.publish({ type: 'user.updated', payload: { userId: event.entity.id, email: event.entity.email, name: event.entity.name, timestamp: new Date(), }, }); }}// 在订单服务中订阅用户事件eventBus.subscribe('user.created', async (event: any) => { console.log('OrderService received user.created event:', event); // 处理用户创建事件 await handleUserCreated(event.payload);});eventBus.subscribe('user.updated', async (event: any) => { console.log('OrderService received user.updated event:', event); // 处理用户更新事件 await handleUserUpdated(event.payload);});2. 消息队列集成// 使用 RabbitMQ 作为消息队列import amqp from 'amqplib';class MessageQueueService { private connection: any; private channel: any; async connect() { this.connection = await amqp.connect('amqp://localhost'); this.channel = await this.connection.createChannel(); } async publish(queue: string, message: any) { await this.channel.assertQueue(queue, { durable: true }); this.channel.sendToQueue( queue, Buffer.from(JSON.stringify(message)), { persistent: true } ); } async subscribe(queue: string, handler: Function) { await this.channel.assertQueue(queue, { durable: true }); this.channel.consume(queue, async (msg: any) => { try { const message = JSON.parse(msg.content.toString()); await handler(message); this.channel.ack(msg); } catch (error) { console.error('Error processing message:', error); this.channel.nack(msg, false, true); // 重新入队 } }); } async close() { await this.channel.close(); await this.connection.close(); }}// 在用户服务中发布消息const messageQueue = new MessageQueueService();await messageQueue.connect();@EventSubscriber()export class UserMessagePublisher implements EntitySubscriberInterface<User> { listenTo() { return User; } async afterInsert(event: InsertEvent<User>) { await messageQueue.publish('user.created', { userId: event.entity.id, email: event.entity.email, name: event.entity.name, timestamp: new Date(), }); }}// 在订单服务中订阅消息await messageQueue.subscribe('user.created', async (message: any) => { console.log('OrderService received user.created message:', message); await handleUserCreated(message);});数据一致性策略1. 最终一致性// 用户数据同步服务class UserDataSyncService { async syncUserToOrderService(user: User) { try { // 调用订单服务 API await fetch('http://order-service/api/users/sync', { method: 'POST', headers: { 'Content-Type': 'application/json', }, body: JSON.stringify({ userId: user.id, email: user.email, name: user.name, }), }); } catch (error) { // 记录失败,稍后重试 console.error('Failed to sync user to order service:', error); await this.scheduleRetry(user); } } async scheduleRetry(user: User) { // 使用延迟队列重试 setTimeout(() => { this.syncUserToOrderService(user); }, 5000); // 5 秒后重试 }}// 在用户服务中使用@EventSubscriber()export class UserSyncSubscriber implements EntitySubscriberInterface<User> { private syncService: UserDataSyncService; constructor() { this.syncService = new UserDataSyncService(); } listenTo() { return User; } afterInsert(event: InsertEvent<User>) { this.syncService.syncUserToOrderService(event.entity); } afterUpdate(event: UpdateEvent<User>) { this.syncService.syncUserToOrderService(event.entity); }}2. 幂等性处理// 幂等性处理器class IdempotencyHandler { private processedIds: Set<string> = new Set(); async processWithIdempotency( id: string, operation: () => Promise<void> ): Promise<void> { if (this.processedIds.has(id)) { console.log(`Operation ${id} already processed, skipping`); return; } await operation(); this.processedIds.add(id); } // 使用数据库存储已处理的 ID async processWithDatabaseIdempotency( id: string, operation: () => Promise<void> ): Promise<void> { const idempotencyRepo = this.dataSource.getRepository(IdempotencyKey); const existing = await idempotencyRepo.findOne({ where: { key: id }, }); if (existing) { console.log(`Operation ${id} already processed, skipping`); return; } await operation(); await idempotencyRepo.save({ key: id, processedAt: new Date(), }); }}// 在消息处理中使用const idempotencyHandler = new IdempotencyHandler();await messageQueue.subscribe('user.created', async (message: any) => { await idempotencyHandler.processWithDatabaseIdempotency( `user.created.${message.userId}`, async () => { await handleUserCreated(message); } );});服务发现与负载均衡1. 服务注册// 服务注册中心class ServiceRegistry { private services: Map<string, ServiceInstance[]> = new Map(); register(serviceName: string, instance: ServiceInstance) { if (!this.services.has(serviceName)) { this.services.set(serviceName, []); } this.services.get(serviceName)!.push(instance); console.log(`Registered ${serviceName} instance:`, instance); } unregister(serviceName: string, instance: ServiceInstance) { const instances = this.services.get(serviceName); if (instances) { const index = instances.indexOf(instance); if (index > -1) { instances.splice(index, 1); console.log(`Unregistered ${serviceName} instance:`, instance); } } } discover(serviceName: string): ServiceInstance | null { const instances = this.services.get(serviceName); if (!instances || instances.length === 0) { return null; } // 轮询负载均衡 const index = Math.floor(Math.random() * instances.length); return instances[index]; }}interface ServiceInstance { host: string; port: number; healthCheckUrl: string;}// 在服务启动时注册const serviceRegistry = new ServiceRegistry();serviceRegistry.register('user-service', { host: 'localhost', port: 3001, healthCheckUrl: 'http://localhost:3001/health',});serviceRegistry.register('order-service', { host: 'localhost', port: 3002, healthCheckUrl: 'http://localhost:3002/health',});2. 健康检查// 健康检查服务class HealthCheckService { async checkService(instance: ServiceInstance): Promise<boolean> { try { const response = await fetch(instance.healthCheckUrl); return response.ok; } catch (error) { console.error(`Health check failed for ${instance.host}:${instance.port}:`, error); return false; } } async startHealthChecks(registry: ServiceRegistry) { setInterval(async () => { const services = ['user-service', 'order-service', 'product-service']; for (const serviceName of services) { const instance = registry.discover(serviceName); if (instance) { const isHealthy = await this.checkService(instance); if (!isHealthy) { console.log(`Service ${serviceName} is unhealthy, removing from registry`); registry.unregister(serviceName, instance); } } } }, 30000); // 每 30 秒检查一次 }}// 启动健康检查const healthCheckService = new HealthCheckService();healthCheckService.startHealthChecks(serviceRegistry);TypeORM 在微服务架构中的应用需要仔细设计数据管理、事务处理和通信机制,以确保系统的可靠性和可扩展性。
服务端阅读 02月18日 19:12

TypeORM 如何使用验证器?包括 class-validator 的集成和自定义验证器的实现

数据验证是应用程序开发中的重要环节,TypeORM 可以与各种验证器库集成,确保数据的完整性和一致性。本文将详细介绍 TypeORM 中如何使用验证器进行数据验证。验证器基础概念什么是验证器验证器是用于验证数据是否符合特定规则的机制,包括:字段类型验证字段格式验证字段长度验证自定义业务规则验证跨字段验证常用验证器库class-validator: 最流行的 TypeScript 验证器库class-transformer: 用于对象转换和验证joi: 强大的对象模式验证库zod: TypeScript 优先的模式验证库使用 class-validator安装依赖npm install class-validator class-transformernpm install --save-dev @types/class-transformer基本验证示例import { Entity, PrimaryGeneratedColumn, Column, BeforeInsert, BeforeUpdate} from 'typeorm';import { IsEmail, IsNotEmpty, IsString, MinLength, MaxLength, IsInt, Min, Max, IsOptional, IsDateString, IsEnum, ValidateIf, ValidateNested} from 'class-validator';import { Type } from 'class-transformer';@Entity()export class User { @PrimaryGeneratedColumn() id: number; @Column() @IsNotEmpty({ message: 'Name cannot be empty' }) @IsString({ message: 'Name must be a string' }) @MinLength(2, { message: 'Name must be at least 2 characters' }) @MaxLength(100, { message: 'Name must not exceed 100 characters' }) name: string; @Column({ unique: true }) @IsEmail({}, { message: 'Invalid email format' }) @IsNotEmpty({ message: 'Email cannot be empty' }) email: string; @Column({ nullable: true }) @IsOptional() @MinLength(8, { message: 'Password must be at least 8 characters' }) @MaxLength(100, { message: 'Password must not exceed 100 characters' }) password?: string; @Column({ type: 'int', nullable: true }) @IsOptional() @IsInt({ message: 'Age must be an integer' }) @Min(18, { message: 'Age must be at least 18' }) @Max(120, { message: 'Age must not exceed 120' }) age?: number; @Column({ type: 'enum', enum: ['active', 'inactive', 'suspended'], default: 'active' }) @IsEnum(['active', 'inactive', 'suspended'], { message: 'Invalid status' }) status: string; @Column({ type: 'date', nullable: true }) @IsOptional() @IsDateString({}, { message: 'Invalid date format' }) birthDate?: Date; @BeforeInsert() @BeforeUpdate() async validate() { const errors = await validate(this); if (errors.length > 0) { throw new Error(`Validation failed: ${JSON.stringify(errors)}`); } }}高级验证自定义验证器import { ValidatorConstraint, ValidatorConstraintInterface, registerDecorator, ValidationOptions } from 'class-validator';// 自定义验证器:检查用户名是否唯一@ValidatorConstraint({ name: 'isUsernameUnique', async: true })export class IsUsernameUniqueConstraint implements ValidatorConstraintInterface { async validate(username: string) { // 这里应该查询数据库检查用户名是否唯一 // 示例代码 const userExists = await checkUsernameExists(username); return !userExists; } defaultMessage(args: ValidationArguments) { return 'Username already exists'; }}// 自定义装饰器export function IsUsernameUnique(validationOptions?: ValidationOptions) { return function (object: Object, propertyName: string) { registerDecorator({ target: object.constructor, propertyName: propertyName, options: validationOptions, constraints: [], validator: IsUsernameUniqueConstraint, }); };}// 使用自定义验证器@Entity()export class User { @Column({ unique: true }) @IsUsernameUnique({ message: 'Username already exists' }) username: string;}条件验证import { ValidateIf } from 'class-validator';@Entity()export class User { @Column() @IsNotEmpty() accountType: 'personal' | 'business'; @Column({ nullable: true }) @ValidateIf(o => o.accountType === 'business') @IsNotEmpty({ message: 'Company name is required for business accounts' }) companyName?: string; @Column({ nullable: true }) @ValidateIf(o => o.accountType === 'business') @IsNotEmpty({ message: 'Tax ID is required for business accounts' }) taxId?: string; @Column({ nullable: true }) @ValidateIf(o => o.accountType === 'personal') @IsNotEmpty({ message: 'Personal ID is required for personal accounts' }) personalId?: string;}嵌套对象验证import { ValidateNested, Type } from 'class-transformer';import { IsNotEmpty, IsString, ValidateIf } from 'class-validator';class Address { @IsNotEmpty() @IsString() street: string; @IsNotEmpty() @IsString() city: string; @IsNotEmpty() @IsString() zipCode: string;}@Entity()export class User { @PrimaryGeneratedColumn() id: number; @Column() @IsNotEmpty() name: string; @Column({ type: 'json', nullable: true }) @ValidateIf(o => o.hasAddress) @ValidateNested() @Type(() => Address) address?: Address; @Column({ default: false }) hasAddress: boolean;}跨字段验证import { ValidatorConstraint, ValidatorConstraintInterface, registerDecorator, ValidationOptions, ValidationArguments } from 'class-validator';// 自定义验证器:确认密码匹配@ValidatorConstraint({ name: 'isPasswordMatching', async: false })export class IsPasswordMatchingConstraint implements ValidatorConstraintInterface { validate(password: string, args: ValidationArguments) { const object = args.object as any; return password === object.password; } defaultMessage(args: ValidationArguments) { return 'Passwords do not match'; }}export function IsPasswordMatching(validationOptions?: ValidationOptions) { return function (object: Object, propertyName: string) { registerDecorator({ target: object.constructor, propertyName: propertyName, options: validationOptions, constraints: [], validator: IsPasswordMatchingConstraint, }); };}@Entity()export class User { @Column() @IsNotEmpty() @MinLength(8) password: string; @Column({ nullable: true }) @IsPasswordMatching({ message: 'Passwords do not match' }) confirmPassword?: string;}验证错误处理验证并获取错误import { validate, ValidationError } from 'class-validator';async function createUser(userData: Partial<User>) { const user = new User(); Object.assign(user, userData); const errors = await validate(user); if (errors.length > 0) { // 格式化错误信息 const formattedErrors = this.formatValidationErrors(errors); throw new Error(`Validation failed: ${JSON.stringify(formattedErrors)}`); } // 保存用户 return await userRepository.save(user);}function formatValidationErrors(errors: ValidationError[]): any { const result: any = {}; errors.forEach(error => { const constraints = error.constraints || {}; result[error.property] = Object.values(constraints).join(', '); if (error.children && error.children.length > 0) { result[error.property] = { ...result[error.property], ...this.formatValidationErrors(error.children) }; } }); return result;}// 使用示例try { const user = await createUser({ name: '', email: 'invalid-email', age: 15 });} catch (error) { console.error(error.message); // 输出: Validation failed: {"name":"Name cannot be empty","email":"Invalid email format","age":"Age must be at least 18"}}自定义验证中间件import { validate } from 'class-validator';import { plainToClass } from 'class-transformer';export function validationMiddleware<T extends object>( type: new () => T) { return async (req: any, res: any, next: any) => { const dto = plainToClass(type, req.body); const errors = await validate(dto); if (errors.length > 0) { const formattedErrors = formatValidationErrors(errors); return res.status(400).json({ error: 'Validation failed', details: formattedErrors }); } req.body = dto; next(); };}// 在 Express 中使用import express from 'express';const app = express();app.post('/users', validationMiddleware(User), async (req, res) => { const user = await userRepository.save(req.body); res.json(user); });验证器装饰器详解字符串验证@Entity()export class User { @Column() @IsString() @IsNotEmpty() @MinLength(2) @MaxLength(100) @IsAlphanumeric() name: string; @Column() @IsEmail() @IsLowercase() email: string; @Column() @IsUrl() website?: string; @Column() @IsPhoneNumber(null) // 需要安装 class-validator-phone-number phone?: string;}数字验证@Entity()export class Product { @Column({ type: 'decimal', precision: 10, scale: 2 }) @IsNumber() @Min(0) @Max(999999.99) price: number; @Column({ type: 'int' }) @IsInt() @IsPositive() stock: number; @Column({ type: 'int' }) @IsInt() @IsDivisibleBy(10) quantity: number;}日期验证@Entity()export class Event { @Column({ type: 'date' }) @IsDateString() @IsBefore('endDate') startDate: Date; @Column({ type: 'date' }) @IsDateString() @IsAfter('startDate') endDate: Date; @Column({ type: 'date' }) @IsDateString() @IsFuture() registrationDeadline?: Date;}数组和对象验证@Entity()export class User { @Column({ type: 'simple-array' }) @IsArray() @ArrayNotEmpty() @ArrayMinSize(1) @ArrayMaxSize(10) @IsString({ each: true }) tags: string[]; @Column({ type: 'json', nullable: true }) @IsObject() @IsNotEmptyObject() metadata?: Record<string, any>; @Column({ type: 'simple-array', nullable: true }) @IsArray() @ArrayUnique() @IsEmail({ each: true }) additionalEmails?: string[];}验证最佳实践1. 分层验证// 实体层验证:数据库级别的验证@Entity()export class User { @Column() @IsNotEmpty() @IsString() name: string; @BeforeInsert() @BeforeUpdate() async validateEntity() { const errors = await validate(this); if (errors.length > 0) { throw new Error(`Entity validation failed: ${JSON.stringify(errors)}`); } }}// DTO 层验证:API 请求级别的验证class CreateUserDto { @IsNotEmpty() @IsString() @MinLength(2) @MaxLength(100) name: string; @IsNotEmpty() @IsEmail() email: string; @IsNotEmpty() @MinLength(8) password: string;}// 在服务层使用 DTO 验证async function createUser(dto: CreateUserDto) { const errors = await validate(dto); if (errors.length > 0) { throw new ValidationException(errors); } const user = new User(); Object.assign(user, dto); return await userRepository.save(user);}2. 异步验证@ValidatorConstraint({ name: 'isEmailUnique', async: true })export class IsEmailUniqueConstraint implements ValidatorConstraintInterface { async validate(email: string) { const user = await userRepository.findOne({ where: { email } }); return !user; } defaultMessage() { return 'Email already exists'; }}@Entity()export class User { @Column({ unique: true }) @IsEmailUnique() email: string;}3. 国际化错误消息import { ValidatorConstraint, ValidatorConstraintInterface } from 'class-validator';@ValidatorConstraint({ name: 'customValidator', async: false })export class CustomValidatorConstraint implements ValidatorConstraintInterface { validate(value: any, args: ValidationArguments) { return true; } defaultMessage(args: ValidationArguments) { // 根据语言环境返回不同的错误消息 const locale = args.object['locale'] || 'en'; const messages = { en: 'Custom validation failed', zh: '自定义验证失败', ja: 'カスタム検証に失敗しました' }; return messages[locale] || messages.en; }}4. 性能优化// 避免在验证器中执行耗时操作@ValidatorConstraint({ name: 'isUnique', async: true })export class IsUniqueConstraint implements ValidatorConstraintInterface { private cache = new Map<string, boolean>(); async validate(value: any, args: ValidationArguments) { const cacheKey = `${args.targetName}.${args.property}.${value}`; // 检查缓存 if (this.cache.has(cacheKey)) { return this.cache.get(cacheKey); } // 执行验证 const result = await this.checkUniqueness(value, args); // 缓存结果 this.cache.set(cacheKey, result); return result; } private async checkUniqueness(value: any, args: ValidationArguments): Promise<boolean> { // 实际的唯一性检查逻辑 return true; }}5. 测试验证器import { validate } from 'class-validator';describe('User Validation', () => { it('should validate valid user', async () => { const user = new User(); user.name = 'John Doe'; user.email = 'john@example.com'; user.age = 25; const errors = await validate(user); expect(errors.length).toBe(0); }); it('should fail validation for invalid email', async () => { const user = new User(); user.name = 'John Doe'; user.email = 'invalid-email'; user.age = 25; const errors = await validate(user); expect(errors.length).toBeGreaterThan(0); expect(errors[0].constraints).toHaveProperty('isEmail'); }); it('should fail validation for underage user', async () => { const user = new User(); user.name = 'John Doe'; user.email = 'john@example.com'; user.age = 15; const errors = await validate(user); expect(errors.length).toBeGreaterThan(0); expect(errors[0].constraints).toHaveProperty('min'); });});验证器与其他库集成与 Joi 集成import * as Joi from 'joi';const userSchema = Joi.object({ name: Joi.string().min(2).max(100).required(), email: Joi.string().email().required(), age: Joi.number().integer().min(18).max(120).optional(), password: Joi.string().min(8).required()});@Entity()export class User { @BeforeInsert() @BeforeUpdate() async validateWithJoi() { const { error } = userSchema.validate(this); if (error) { throw new Error(`Validation failed: ${error.details[0].message}`); } }}与 Zod 集成import { z } from 'zod';const userSchema = z.object({ name: z.string().min(2).max(100), email: z.string().email(), age: z.number().int().min(18).max(120).optional(), password: z.string().min(8)});@Entity()export class User { @BeforeInsert() @BeforeUpdate() async validateWithZod() { const result = userSchema.safeParse(this); if (!result.success) { throw new Error(`Validation failed: ${JSON.stringify(result.error.errors)}`); } }}TypeORM 的验证器功能提供了强大的数据验证能力,合理使用验证器可以确保数据的完整性和一致性,提高应用程序的健壮性。
服务端阅读 02月18日 18:25

VS Code 代码片段如何创建和使用?

VS Code 代码片段(Snippets)与自定义模板VS Code 代码片段是可重用的代码模板,可以通过快捷方式快速插入常用代码模式,显著提高编码效率。代码片段文件代码片段存储在 .vscode/*.code-snippets 文件中,或全局存储在用户目录下。创建代码片段通过菜单:File > Preferences > User Snippets选择语言或创建全局代码片段编辑 JSON 格式的代码片段文件代码片段语法基本结构{ "Snippet Name": { "prefix": "trigger", "body": [ "code line 1", "code line 2" ], "description": "Snippet description" }}占位符(Tabstops)使用 $1, $2 等定义光标位置:{ "Function Template": { "prefix": "func", "body": [ "function ${1:functionName}(${2:parameters}) {", "\t$0", "}" ], "description": "Create a function" }}选择占位符(Choice)提供多个选项供选择:{ "Console Log": { "prefix": "log", "body": [ "console.${1|log,warn,error|}($2);" ], "description": "Console log statement" }}变量(Variables)使用预定义变量:{ "File Header": { "prefix": "header", "body": [ "// File: ${TM_FILENAME}", "// Author: ${TM_USERNAME}", "// Date: ${CURRENT_YEAR}-${CURRENT_MONTH}-${CURRENT_DATE}" ], "description": "File header comment" }}常用变量TM_FILENAME: 当前文件名TM_FILENAME_BASE: 不含扩展名的文件名TM_DIRECTORY: 当前文件目录TM_FILEPATH: 当前文件完整路径CLIPBOARD: 剪贴板内容CURRENT_YEAR: 当前年份CURRENT_MONTH: 当前月份CURRENT_DATE: 当前日期语言特定代码片段JavaScript/TypeScript{ "React Component": { "prefix": "react-component", "body": [ "import React from 'react';", "", "interface ${1:ComponentName}Props {", "\t${2:prop}: ${3:type};", "}", "", "const ${1:ComponentName}: React.FC<${1:ComponentName}Props> = ({ ${2:prop} }) => {", "\treturn (", "\t\t<div>", "\t\t\t${4:content}", "\t\t</div>", "\t);", "};", "", "export default ${1:ComponentName};" ], "description": "React functional component with TypeScript" }}Python{ "Python Class": { "prefix": "class", "body": [ "class ${1:ClassName}:", "\t\"\"\"${2:Class description}\"\"\"", "\t", "\tdef __init__(self${3:, args}):", "\t\t${4:pass}" ], "description": "Python class template" }}HTML{ "HTML5 Boilerplate": { "prefix": "html5", "body": [ "<!DOCTYPE html>", "<html lang=\"en\">", "<head>", "\t<meta charset=\"UTF-8\">", "\t<meta name=\"viewport\" content=\"width=device-width, initial-scale=1.0\">", "\t<title>${1:Page Title}</title>", "</head>", "<body>", "\t${2:content}", "</body>", "</html>" ], "description": "HTML5 boilerplate" }}高级功能转换(Transforms)对变量进行转换:{ "Import Statement": { "prefix": "import", "body": [ "import { ${1:${TM_FILENAME_BASE/(.*)/${1:/capitalize}/}} } from './${1}';" ], "description": "Import statement with capitalized name" }}嵌套代码片段在代码片段中调用其他代码片段。全局代码片段创建适用于所有语言的代码片段,文件名为 global.code-snippets:{ "TODO Comment": { "prefix": "todo", "body": [ "// TODO: ${1:description} - ${CURRENT_YEAR}-${CURRENT_MONTH}-${CURRENT_DATE}" ], "description": "TODO comment with date" }}使用技巧触发方式: 输入前缀后按 Tab 键跳转占位符: 按 Tab 在占位符间跳转多光标编辑: 使用相同的占位符编号实现多光标编辑代码片段优先级: 项目代码片段 > 全局代码片段注意事项代码片段文件使用 UTF-8 编码避免使用过长的前缀提供清晰的描述测试代码片段在不同场景下的表现考虑团队协作,共享有用的代码片段
服务端阅读 02月18日 18:23

VS Code 多光标编辑有哪些技巧?

VS Code 多光标编辑与高级选择技巧VS Code 多光标编辑功能允许同时在多个位置编辑代码,大幅提高编辑效率。配合高级选择技巧,可以快速完成复杂的批量编辑任务。多光标创建方式鼠标操作Alt + 点击: 在点击位置添加光标Ctrl + Alt + 上/下箭头: 在上方或下方添加光标Ctrl + U: 撤销最后一个光标键盘操作Ctrl + Alt + 上/下箭头: 添加光标Ctrl + Alt + 左/右箭头: 向左或向右扩展选择快捷选择Ctrl + D: 选择下一个相同的词Ctrl + Shift + L: 选择所有相同的词Ctrl + F2: 选择所有当前词的实例高级选择技巧列选择模式Shift + Alt + 拖动: 列选择Ctrl + Shift + Alt + 上/下箭头: 列选择扩展智能选择Shift + Alt + →: 扩展选择到下一个语法单元Shift + Alt + ←: 收缩选择Ctrl + Shift + →: 扩展选择到下一个单词Ctrl + Shift + ←: 收缩选择到上一个单词快速跳转和选择Ctrl + G: 跳转到指定行Ctrl + T: 跳转到符号Ctrl + Shift + O: 跳转到文件中的符号Ctrl + P: 快速打开文件实用场景批量重命名变量// 原始代码const userName = 'John';const userAge = 25;const userEmail = 'john@example.com';// 操作:双击 userName,按 Ctrl + D 两次,然后修改const firstName = 'John';const firstAge = 25;const firstEmail = 'john@example.com';批量修改属性// 原始代码const obj = { name: 'John', age: 25, email: 'john@example.com'};// 操作:选择所有属性名,添加引号const obj = { 'name': 'John', 'age': 25, 'email': 'john@example.com'};批量添加注释// 选择多行,按 Ctrl + /// const line1 = 'code';// const line2 = 'code';// const line3 = 'code';正则表达式查找替换使用正则表达式Ctrl + H: 打开查找替换Alt + R: 启用正则表达式模式Ctrl + Alt + Enter: 替换所有正则表达式示例// 查找: console\.log\((.*)\)// 替换: console.info($1)// 将所有 console.log 替换为 console.info捕获组使用// 查找: (\w+)\.(\w+)\((.*)\)// 替换: $2($1, $3)// 将 obj.method(args) 转换为 method(obj, args)高级编辑技巧多行编辑Shift + Enter: 在当前行下方插入新行Ctrl + Enter: 在当前行上方插入新行Ctrl + Shift + K: 删除当前行代码格式化Shift + Alt + F: 格式化整个文档Ctrl + K, Ctrl + F: 格式化选中部分代码移动Alt + 上/下箭头: 移动当前行Shift + Alt + 上/下箭头: 复制当前行代码折叠Ctrl + K, Ctrl + 0: 折叠所有Ctrl + K, Ctrl + J: 展开所有Ctrl + K, Ctrl + [: 折叠当前区域Ctrl + K, Ctrl + ]: 展开当前区域自定义快捷键在 keybindings.json 中自定义快捷键:[ { "key": "ctrl+shift+;", "command": "editor.action.insertCursorAtEndOfEachLineSelected", "when": "editorTextFocus" }]注意事项多光标编辑在大量光标时可能影响性能使用 Ctrl + U 可以撤销光标操作正则表达式替换前建议预览复杂操作可以录制宏合理使用选择历史记录(Ctrl + Shift + G)
服务端阅读 02月18日 18:22

VS Code 扩展如何发布到市场?

VS Code 扩展发布与市场管理VS Code 扩展发布是将开发完成的扩展发布到 VS Code 市场供其他用户使用的过程。了解发布流程和市场管理对于扩展开发者至关重要。发布前准备扩展验证清单确保扩展功能完整且经过测试编写清晰的 README.md 文档准备扩展图标(128x128 像素)添加适当的标签和分类验证 package.json 配置package.json 关键字段{ "name": "my-extension", "displayName": "My Extension", "description": "A useful VS Code extension", "version": "1.0.0", "publisher": "your-publisher-name", "engines": { "vscode": "^1.60.0" }, "categories": [ "Other", "Snippets" ], "keywords": [ "utility", "productivity" ], "icon": "icon.png", "repository": { "type": "git", "url": "https://github.com/username/my-extension" }, "license": "MIT"}发布者注册创建发布者账户访问 https://dev.azure.com/使用 Microsoft 账户登录创建新的组织或使用现有组织在 VS Code 市场创建发布者获取发布者名称(用于 package.json 的 publisher 字段)发布者信息发布者名称:全局唯一,用于标识扩展发布者显示名称:在市场中显示的名称邮箱:用于接收通知发布工具安装安装 vsce(VS Code Extension Manager)npm install -g @vscode/vsce验证安装vsce --version打包扩展基本打包命令vsce package指定输出文件名vsce package --out my-extension-1.0.0.vsix打包选项--baseContentUrl: 设置基础内容 URL--baseImagesUrl: 设置基础图片 URL--yarn: 使用 yarn 而非 npm发布扩展首次发布vsce publish指定版本发布vsce publish minorvsce publish patchvsce publish 1.1.0发布到预发布频道vsce publish --pre-release发布到特定目标vsce publish --target win32-x64vsce publish --target linux-x64,darwin-arm64版本管理语义化版本主版本(Major): 不兼容的 API 变更次版本(Minor): 向后兼容的功能新增修订版本(Patch): 向后兼容的问题修复更新 package.json{ "version": "1.1.0"}扩展管理更新扩展修改代码和 package.json 版本号重新打包:vsce package发布新版本:vsce publish废弃扩展vsce unpublish my-extension删除特定版本vsce delete my-extension 1.0.0市场优化SEO 优化使用相关关键词编写吸引人的描述添加适当的标签提供清晰的截图和演示用户评价管理积极回应用户反馈及时修复报告的问题根据用户建议改进功能统计分析访问 VS Code 市场统计页面查看下载量和安装量分析用户行为和反馈注意事项确保扩展遵守 VS Code 市场政策不要发布恶意或有害的扩展定期更新扩展以保持兼容性保护好发布者账户的访问令牌考虑开源代码以增加信任度
服务端阅读 02月18日 18:16

VS Code 语言服务器协议(LSP)是什么?

语言服务器协议(Language Server Protocol,LSP)是 VS Code 推出的一种协议,用于将编辑器功能与特定语言实现分离,实现代码补全、跳转定义、错误检查等智能功能。LSP 架构LSP 采用客户端-服务器架构:客户端: VS Code 编辑器,负责 UI 和用户交互服务器: 语言服务器,负责语言特定的分析功能两者通过 JSON-RPC 协议通信。核心功能代码补全(Completion)// 客户端请求{ "jsonrpc": "2.0", "id": 1, "method": "textDocument/completion", "params": { "textDocument": { "uri": "file:///path/to/file.ts" }, "position": { "line": 5, "character": 10 } }}// 服务器响应{ "jsonrpc": "2.0", "id": 1, "result": { "isIncomplete": false, "items": [ { "label": "console", "kind": 3, "detail": "any", "documentation": "The console module..." } ] }}跳转定义(Go to Definition){ "method": "textDocument/definition", "params": { "textDocument": { "uri": "..." }, "position": { "line": 0, "character": 0 } }}悬停提示(Hover){ "method": "textDocument/hover", "params": { "textDocument": { "uri": "..." }, "position": { "line": 0, "character": 0 } }}诊断信息(Diagnostics)服务器主动推送错误和警告信息:{ "method": "textDocument/publishDiagnostics", "params": { "uri": "file:///path/to/file.ts", "diagnostics": [ { "range": { "start": { "line": 0, "character": 0 }, "end": { "line": 0, "character": 5 } }, "severity": 1, "message": "Cannot find name 'foo'" } ] }}VS Code 扩展中的 LSP创建语言服务器扩展安装依赖npm install vscode-languageclient --save配置 package.json{ "contributes": { "languages": [{ "id": "mylang", "aliases": ["My Language", "mylang"], "extensions": [".mylang"] }], "grammars": [{ "language": "mylang", "scopeName": "source.mylang", "path": "./syntaxes/mylang.tmLanguage.json" }] }, "activationEvents": [ "onLanguage:mylang" ]}实现客户端import * as vscode from 'vscode';import { LanguageClient, LanguageClientOptions } from 'vscode-languageclient';let client: LanguageClient;export function activate(context: vscode.ExtensionContext) { const serverOptions = { command: 'node', args: [context.asAbsolutePath('server/out/server.js')] }; const clientOptions: LanguageClientOptions = { documentSelector: [{ scheme: 'file', language: 'mylang' }], synchronize: { configurationSection: 'mylang' } }; client = new LanguageClient( 'mylang', 'My Language Server', serverOptions, clientOptions ); client.start();}常见语言服务器TypeScript: vscode-typescript-nextPython: python-language-server (Pylance)Go: goplsRust: rust-analyzerJava: jdt.lsLSP 优势跨编辑器支持: 同一个语言服务器可用于多个编辑器解耦: 编辑器与语言实现分离性能优化: 语言服务器可以独立优化可扩展: 易于添加新的语言功能性能优化延迟初始化const clientOptions: LanguageClientOptions = { documentSelector: [{ scheme: 'file', language: 'mylang' }], initializationOptions: { deferInitialization: true }};增量分析只分析文件变化部分,而非整个项目。注意事项语言服务器应正确处理并发请求实现取消机制以避免资源浪费提供清晰的错误信息考虑内存使用和性能支持工作区配置
服务端阅读 02月18日 18:07

TensorFlow 中的分布式训练策略有哪些,如何实现多 GPU 训练

TensorFlow 提供了强大的分布式训练能力,支持在单机多 GPU、多机多 GPU 以及 TPU 上进行训练。了解这些策略对于加速大规模模型训练至关重要。分布式训练策略概览TensorFlow 2.x 提供了统一的 tf.distribute.Strategy API,支持以下策略:MirroredStrategy:单机多 GPU 同步训练MultiWorkerMirroredStrategy:多机多 GPU 同步训练TPUStrategy:TPU 训练ParameterServerStrategy:参数服务器架构CentralStorageStrategy:单机多 GPU,参数集中存储MirroredStrategy(单机多 GPU)基本用法import tensorflow as tf# 检查可用的 GPUprint("GPU 数量:", len(tf.config.list_physical_devices('GPU')))# 创建 MirroredStrategystrategy = tf.distribute.MirroredStrategy()print("副本数量:", strategy.num_replicas_in_sync)完整训练示例import tensorflow as tffrom tensorflow.keras import layers, models# 创建策略strategy = tf.distribute.MirroredStrategy()# 在策略作用域内创建和编译模型with strategy.scope(): # 构建模型 model = models.Sequential([ layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)), layers.MaxPooling2D((2, 2)), layers.Conv2D(64, (3, 3), activation='relu'), layers.MaxPooling2D((2, 2)), layers.Flatten(), layers.Dense(128, activation='relu'), layers.Dense(10, activation='softmax') ]) # 编译模型 model.compile( optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'] )# 加载数据(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()x_train = x_train.reshape(-1, 28, 28, 1).astype('float32') / 255.0x_test = x_test.reshape(-1, 28, 28, 1).astype('float32') / 255.0# 创建分布式数据集batch_size_per_replica = 64global_batch_size = batch_size_per_replica * strategy.num_replicas_in_synctrain_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))train_dataset = train_dataset.shuffle(10000).batch(global_batch_size).prefetch(tf.data.AUTOTUNE)test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))test_dataset = test_dataset.batch(global_batch_size).prefetch(tf.data.AUTOTUNE)# 训练模型model.fit(train_dataset, epochs=10, validation_data=test_dataset)自定义训练循环import tensorflow as tffrom tensorflow.keras import optimizers, lossesstrategy = tf.distribute.MirroredStrategy()with strategy.scope(): model = models.Sequential([ layers.Dense(128, activation='relu', input_shape=(784,)), layers.Dense(10, activation='softmax') ]) optimizer = optimizers.Adam(learning_rate=0.001) loss_fn = losses.SparseCategoricalCrossentropy()# 训练步骤@tf.functiondef train_step(inputs, targets): with tf.GradientTape() as tape: predictions = model(inputs, training=True) per_replica_loss = loss_fn(targets, predictions) loss = tf.reduce_mean(per_replica_loss) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) return loss# 分布式训练步骤@tf.functiondef distributed_train_step(dataset_inputs): per_replica_losses = strategy.run(train_step, args=(dataset_inputs,)) return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)# 训练循环epochs = 10for epoch in range(epochs): total_loss = 0 num_batches = 0 for inputs, targets in train_dataset: loss = distributed_train_step((inputs, targets)) total_loss += loss num_batches += 1 avg_loss = total_loss / num_batches print(f'Epoch {epoch + 1}, Loss: {avg_loss:.4f}')MultiWorkerMirroredStrategy(多机多 GPU)基本配置import tensorflow as tfimport os# 设置环境变量os.environ['TF_CONFIG'] = json.dumps({ 'cluster': { 'worker': ["host1:port", "host2:port", "host3:port"] }, 'task': {'type': 'worker', 'index': 0}})# 创建策略strategy = tf.distribute.MultiWorkerMirroredStrategy()print("副本数量:", strategy.num_replicas_in_sync)使用 TF_CONFIG 配置import jsonimport os# Worker 1 的配置tf_config_worker1 = { 'cluster': { 'worker': ["worker1.example.com:12345", "worker2.example.com:12345"] }, 'task': {'type': 'worker', 'index': 0}}# Worker 2 的配置tf_config_worker2 = { 'cluster': { 'worker': ["worker1.example.com:12345", "worker2.example.com:12345"] }, 'task': {'type': 'worker', 'index': 1}}# 设置环境变量os.environ['TF_CONFIG'] = json.dumps(tf_config_worker1)训练代码(与 MirroredStrategy 相同)with strategy.scope(): model = create_model() model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')model.fit(train_dataset, epochs=10)TPUStrategy(TPU 训练)基本用法import tensorflow as tf# 创建 TPU 策略resolver = tf.distribute.cluster_resolver.TPUClusterResolver()tf.config.experimental_connect_to_cluster(resolver)tf.tpu.experimental.initialize_tpu_system(resolver)strategy = tf.distribute.TPUStrategy(resolver)print("TPU 副本数量:", strategy.num_replicas_in_sync)TPU 训练示例with strategy.scope(): model = models.Sequential([ layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)), layers.MaxPooling2D((2, 2)), layers.Flatten(), layers.Dense(128, activation='relu'), layers.Dense(10, activation='softmax') ]) model.compile( optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'] )# 调整批次大小以适应 TPUbatch_size = 1024 # TPU 支持更大的批次大小train_dataset = train_dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)model.fit(train_dataset, epochs=10)ParameterServerStrategy(参数服务器)基本配置import tensorflow as tfimport jsonimport os# 参数服务器配置tf_config = { 'cluster': { 'worker': ["worker1.example.com:12345", "worker2.example.com:12345"], 'ps': ["ps1.example.com:12345", "ps2.example.com:12345"] }, 'task': {'type': 'worker', 'index': 0}}os.environ['TF_CONFIG'] = json.dumps(tf_config)# 创建策略strategy = tf.distribute.ParameterServerStrategy()使用 ParameterServerStrategywith strategy.scope(): model = create_model() optimizer = tf.keras.optimizers.Adam() # 自定义训练循环 @tf.function def train_step(inputs, targets): with tf.GradientTape() as tape: predictions = model(inputs) loss = loss_fn(targets, predictions) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) return lossCentralStorageStrategy(集中存储)基本用法import tensorflow as tf# 创建策略strategy = tf.distribute.CentralStorageStrategy()print("副本数量:", strategy.num_replicas_in_sync)# 使用方式与 MirroredStrategy 相同with strategy.scope(): model = create_model() model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')model.fit(train_dataset, epochs=10)数据分布策略自动分片# 使用 strategy.experimental_distribute_dataset 自动分片distributed_dataset = strategy.experimental_distribute_dataset(dataset)# 或者使用 strategy.distribute_datasets_from_functiondef dataset_fn(input_context): batch_per_replica = 64 global_batch_size = batch_per_replica * input_context.num_replicas_in_sync dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)) dataset = dataset.shuffle(10000).batch(global_batch_size) return dataset.shard(input_context.num_input_pipelines, input_context.input_pipeline_id)distributed_dataset = strategy.distribute_datasets_from_function(dataset_fn)性能优化技巧1. 混合精度训练from tensorflow.keras import mixed_precision# 启用混合精度policy = mixed_precision.Policy('mixed_float16')mixed_precision.set_global_policy(policy)with strategy.scope(): model = create_model() # 需要使用损失缩放 optimizer = mixed_precision.LossScaleOptimizer(optimizer) model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy')2. 同步批量归一化# 使用 SyncBatchNormalizationwith strategy.scope(): model = models.Sequential([ layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)), layers.BatchNormalization(), # 自动转换为 SyncBatchNormalization layers.MaxPooling2D((2, 2)), layers.Flatten(), layers.Dense(10, activation='softmax') ])3. XLA 编译# 启用 XLA 编译tf.config.optimizer.set_jit(True)with strategy.scope(): model = create_model() model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')4. 优化数据加载# 使用 AUTOTUNE 自动优化train_dataset = train_dataset.cache()train_dataset = train_dataset.shuffle(10000)train_dataset = train_dataset.batch(global_batch_size)train_dataset = train_dataset.prefetch(tf.data.AUTOTUNE)监控和调试使用 TensorBoardimport datetime# 创建日志目录log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")tensorboard_callback = tf.keras.callbacks.TensorBoard( log_dir=log_dir, histogram_freq=1)# 训练时使用回调model.fit( train_dataset, epochs=10, callbacks=[tensorboard_callback])监控 GPU 使用情况# 查看设备分配print("设备列表:", tf.config.list_physical_devices())# 查看当前设备print("当前设备:", tf.test.gpu_device_name())常见问题和解决方案1. 内存不足# 减小批次大小batch_size_per_replica = 32 # 从 64 减小到 32# 使用梯度累积# 或者使用模型并行2. 通信开销# 增大批次大小以减少通信频率global_batch_size = 256 * strategy.num_replicas_in_sync# 使用梯度压缩# 或者使用异步更新3. 数据加载瓶颈# 使用缓存train_dataset = train_dataset.cache()# 使用预取train_dataset = train_dataset.prefetch(tf.data.AUTOTUNE)# 使用并行加载train_dataset = train_dataset.map( preprocess, num_parallel_calls=tf.data.AUTOTUNE)策略选择指南| 策略 | 适用场景 | 优点 | 缺点 || --------------------------- | ------------- | -------- | ---------- || MirroredStrategy | 单机多 GPU | 简单易用,性能好 | 受限于单机资源 || MultiWorkerMirroredStrategy | 多机多 GPU | 可扩展性强 | 配置复杂,网络开销 || TPUStrategy | TPU 环境 | 极高性能 | 仅限 TPU || ParameterServerStrategy | 大规模异步训练 | 支持超大规模模型 | 实现复杂,收敛慢 || CentralStorageStrategy | 单机多 GPU(参数集中) | 简单,内存效率高 | 参数更新可能成为瓶颈 |完整的多 GPU 训练示例import tensorflow as tffrom tensorflow.keras import layers, models# 1. 创建策略strategy = tf.distribute.MirroredStrategy()# 2. 在策略作用域内构建模型with strategy.scope(): model = models.Sequential([ layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)), layers.MaxPooling2D((2, 2)), layers.Conv2D(64, (3, 3), activation='relu'), layers.MaxPooling2D((2, 2)), layers.Flatten(), layers.Dense(128, activation='relu'), layers.Dropout(0.5), layers.Dense(10, activation='softmax') ]) model.compile( optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'] )# 3. 准备数据(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()x_train = x_train.reshape(-1, 28, 28, 1).astype('float32') / 255.0x_test = x_test.reshape(-1, 28, 28, 1).astype('float32') / 255.0# 4. 创建分布式数据集batch_size_per_replica = 64global_batch_size = batch_size_per_replica * strategy.num_replicas_in_synctrain_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))train_dataset = train_dataset.shuffle(10000).batch(global_batch_size).prefetch(tf.data.AUTOTUNE)test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))test_dataset = test_dataset.batch(global_batch_size).prefetch(tf.data.AUTOTUNE)# 5. 训练模型history = model.fit( train_dataset, epochs=10, validation_data=test_dataset, callbacks=[ tf.keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True), tf.keras.callbacks.ModelCheckpoint('best_model.h5', save_best_only=True) ])# 6. 评估模型test_loss, test_acc = model.evaluate(test_dataset)print(f'Test Accuracy: {test_acc:.4f}')总结TensorFlow 的分布式训练策略提供了灵活且强大的多 GPU 训练能力:MirroredStrategy:最适合单机多 GPU 场景MultiWorkerMirroredStrategy:适用于多机多 GPU 场景TPUStrategy:在 TPU 上获得最佳性能ParameterServerStrategy:支持超大规模异步训练CentralStorageStrategy:单机多 GPU 的替代方案掌握这些策略将帮助你充分利用硬件资源,加速模型训练。
服务端阅读 02月18日 18:02

VS Code 远程开发如何配置和使用?

VS Code 远程开发允许您在远程机器、容器或 WSL 上进行开发,同时享受本地 VS Code 的完整体验。通过 SSH、Containers 和 WSL 扩展实现。远程开发模式SSH 远程开发通过 SSH 连接到远程服务器进行开发。容器远程开发在 Docker 容器中进行开发。WSL 远程开发在 Windows Subsystem for Linux 中进行开发。SSH 远程开发配置安装扩展安装 "Remote - SSH" 扩展。配置 SSH 主机编辑 SSH 配置文件 ~/.ssh/config:Host myserver HostName 192.168.1.100 User username Port 22 IdentityFile ~/.ssh/id_rsa连接到远程主机按 F1 或 Ctrl+Shift+P 打开命令面板输入 "Remote-SSH: Connect to Host"选择配置的主机首次连接需要输入密码或密钥远程开发工作流程文件操作在远程服务器上打开文件夹作为工作区所有文件操作都在远程服务器上执行本地 VS Code 作为界面和编辑器扩展管理本地扩展: 在本地运行,如主题、键盘快捷键远程扩展: 在远程服务器上运行,如语言服务器、调试器扩展会自动分类和安装终端使用集成终端在远程服务器上运行支持多个终端会话终端环境与远程服务器一致容器远程开发Docker 配置安装 "Remote - Containers" 扩展确保本地安装了 Docker在项目中创建 .devcontainer 文件夹devcontainer.json{ "name": "My Development Container", "image": "mcr.microsoft.com/devcontainers/javascript-node:18", "features": { "ghcr.io/devcontainers/features/node:1": {} }, "customizations": { "vscode": { "extensions": ["dbaeumer.vscode-eslint"] } }, "postCreateCommand": "npm install"}使用容器开发打开命令面板选择 "Remote-Containers: Reopen in Container"VS Code 会在容器中重新打开项目WSL 远程开发配置 WSL安装 WSL 2安装 "Remote - WSL" 扩展在 WSL 中打开项目WSL 特性完整的 Linux 环境与 Windows 文件系统集成支持多个 WSL 发行版性能优化文件同步避免同步大量文件使用 .vscodeignore 排除不必要的文件扩展优化只安装必要的远程扩展禁用不必要的扩展网络优化使用 SSH 密钥而非密码认证配置 SSH 连接保持活跃考虑使用 SSH 隧道加速常见问题解决连接失败检查 SSH 配置验证网络连接确认服务器 SSH 服务运行性能问题检查网络延迟优化文件同步减少扩展数量扩展问题某些扩展可能不支持远程开发检查扩展兼容性手动安装远程扩展注意事项远程开发需要稳定的网络连接确保远程服务器有足够的资源注意文件权限和路径问题敏感数据应存储在远程服务器定期备份远程代码
服务端阅读 02月18日 18:00

TensorFlow 中的模型保存和加载有哪些方法,如何进行模型部署

TensorFlow 提供了多种模型保存和加载的方法,以及灵活的模型部署选项。掌握这些技能对于生产环境中的深度学习应用至关重要。模型保存格式TensorFlow 支持多种模型保存格式:SavedModel 格式:TensorFlow 2.x 推荐的格式Keras H5 格式:传统的 Keras 模型格式TensorFlow Lite 格式:用于移动设备和嵌入式设备TensorFlow.js 格式:用于 Web 浏览器SavedModel 格式保存完整模型import tensorflow as tffrom tensorflow.keras import layers, models# 构建模型model = models.Sequential([ layers.Dense(64, activation='relu', input_shape=(10,)), layers.Dense(10, activation='softmax')])model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')# 保存为 SavedModel 格式model.save('saved_model/my_model')# SavedModel 目录结构:# saved_model/# ├── saved_model.pb# ├── variables/# └── assets/加载 SavedModel# 加载模型loaded_model = tf.keras.models.load_model('saved_model/my_model')# 使用模型predictions = loaded_model.predict(x_test)保存特定版本import tensorflow as tf# 保存模型并指定版本model.save('saved_model/my_model/1')# 保存多个版本model.save('saved_model/my_model/2')Keras H5 格式保存完整模型# 保存为 H5 格式model.save('my_model.h5')# 保存时包含优化器状态model.save('my_model_with_optimizer.h5', save_format='h5')加载 H5 模型# 加载模型loaded_model = tf.keras.models.load_model('my_model.h5')# 加载并继续训练loaded_model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')loaded_model.fit(x_train, y_train, epochs=5)只保存模型架构# 保存模型架构为 JSONmodel_json = model.to_json()with open('model_architecture.json', 'w') as json_file: json_file.write(model_json)# 从 JSON 加载架构with open('model_architecture.json', 'r') as json_file: loaded_model_json = json_file.read()loaded_model = tf.keras.models.model_from_json(loaded_model_json)# 加载权重loaded_model.load_weights('model_weights.h5')只保存模型权重# 保存权重model.save_weights('model_weights.h5')# 加载权重model.load_weights('model_weights.h5')# 加载到不同的模型new_model = create_model()new_model.load_weights('model_weights.h5')检查点(Checkpoint)保存检查点from tensorflow.keras.callbacks import ModelCheckpoint# 创建检查点回调checkpoint_callback = ModelCheckpoint( filepath='checkpoints/model_{epoch:02d}.h5', save_weights_only=False, save_best_only=True, monitor='val_loss', mode='min', verbose=1)# 训练时保存检查点model.fit( x_train, y_train, epochs=10, validation_data=(x_val, y_val), callbacks=[checkpoint_callback])手动保存检查点# 手动保存检查点model.save_weights('checkpoints/ckpt')# 保存优化器状态optimizer_state = tf.train.Checkpoint(optimizer=optimizer, model=model)optimizer_state.save('checkpoints/optimizer')恢复检查点# 恢复检查点model.load_weights('checkpoints/ckpt')# 恢复优化器状态optimizer_state = tf.train.Checkpoint(optimizer=optimizer, model=model)optimizer_state.restore('checkpoints/optimizer')TensorFlow Lite 部署转换为 TFLite 模型import tensorflow as tf# 转换模型converter = tf.lite.TFLiteConverter.from_keras_model(model)tflite_model = converter.convert()# 保存 TFLite 模型with open('model.tflite', 'wb') as f: f.write(tflite_model)优化 TFLite 模型# 量化模型converter = tf.lite.TFLiteConverter.from_keras_model(model)converter.optimizations = [tf.lite.Optimize.DEFAULT]tflite_quant_model = converter.convert()# 保存量化模型with open('model_quant.tflite', 'wb') as f: f.write(tflite_quant_model)在 Python 中运行 TFLite 模型import tensorflow as tfimport numpy as np# 加载 TFLite 模型interpreter = tf.lite.Interpreter(model_path='model.tflite')interpreter.allocate_tensors()# 获取输入输出张量input_details = interpreter.get_input_details()output_details = interpreter.get_output_details()# 准备输入数据input_data = np.array(np.random.random_sample(input_details[0]['shape']), dtype=np.float32)# 设置输入interpreter.set_tensor(input_details[0]['index'], input_data)# 运行推理interpreter.invoke()# 获取输出output_data = interpreter.get_tensor(output_details[0]['index'])print(output_data)在移动设备上部署Android 部署import org.tensorflow.lite.Interpreter;// 加载模型Interpreter interpreter = new Interpreter(loadModelFile());// 准备输入float[][] input = new float[1][10];// 运行推理float[][] output = new float[1][10];interpreter.run(input, output);iOS 部署import TensorFlowLite// 加载模型guard let interpreter = try? Interpreter(modelPath: "model.tflite") else { fatalError("Failed to load model")}// 准备输入var input: [Float] = Array(repeating: 0.0, count: 10)// 运行推理var output: [Float] = Array(repeating: 0.0, count: 10)try interpreter.copy(input, toInputAt: 0)try interpreter.invoke()try interpreter.copy(&output, fromOutputAt: 0)TensorFlow.js 部署转换为 TensorFlow.js 模型# 安装 tensorflowjs_converterpip install tensorflowjs# 转换模型tensorflowjs_converter --input_format keras \ my_model.h5 \ tfjs_model在浏览器中使用<!DOCTYPE html><html><head> <script src="https://cdn.jsdelivr.net/npm/@tensorflow/tfjs@latest"></script></head><body> <script> // 加载模型 async function loadModel() { const model = await tf.loadLayersModel('tfjs_model/model.json'); return model; } // 运行推理 async function predict() { const model = await loadModel(); const input = tf.randomNormal([1, 10]); const output = model.predict(input); output.print(); } predict(); </script></body></html>TensorFlow Serving 部署导出模型import tensorflow as tf# 导出模型为 SavedModel 格式model.save('serving_model/1')使用 Docker 部署# 拉取 TensorFlow Serving 镜像docker pull tensorflow/serving# 运行 TensorFlow Servingdocker run -p 8501:8501 \ --mount type=bind,source=$(pwd)/serving_model,target=/models/my_model \ -e MODEL_NAME=my_model \ -t tensorflow/serving &使用 REST API 调用import requestsimport jsonimport numpy as np# 准备输入数据input_data = np.random.random((1, 10)).tolist()# 发送请求response = requests.post( 'http://localhost:8501/v1/models/my_model:predict', json={'instances': input_data})# 获取预测结果predictions = response.json()['predictions']print(predictions)使用 gRPC 调用import grpcfrom tensorflow_serving.apis import predict_pb2from tensorflow_serving.apis import prediction_service_pb2_grpcimport numpy as np# 创建 gRPC 连接channel = grpc.insecure_channel('localhost:8500')stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)# 创建预测请求request = predict_pb2.PredictRequest()request.model_spec.name = 'my_model'request.model_spec.signature_name = 'serving_default'# 设置输入数据input_data = np.random.random((1, 10)).astype(np.float32)request.inputs['input_1'].CopyFrom(tf.make_tensor_proto(input_data))# 发送请求result = stub.Predict(request, timeout=10.0)print(result)云平台部署Google Cloud AI Platformfrom google.cloud import aiplatform# 上传模型model = aiplatform.Model.upload( display_name='my_model', artifact_uri='gs://my-bucket/model', serving_container_image_uri='us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest')# 部署模型endpoint = model.deploy( machine_type='n1-standard-4', min_replica_count=1, max_replica_count=5)AWS SageMakerimport sagemakerfrom sagemaker.tensorflow import TensorFlowModel# 创建模型model = TensorFlowModel( model_data='s3://my-bucket/model.tar.gz', role='arn:aws:iam::123456789012:role/service-role/AmazonSageMaker-ExecutionRole', framework_version='2.6.0')# 部署模型predictor = model.deploy( initial_instance_count=1, instance_type='ml.m5.xlarge')# 进行预测predictions = predictor.predict(input_data)模型版本管理保存多个版本import os# 保存不同版本的模型version = 1model.save(f'saved_model/my_model/{version}')# 更新版本version += 1model.save(f'saved_model/my_model/{version}')加载特定版本# 加载最新版本latest_model = tf.keras.models.load_model('saved_model/my_model')# 加载特定版本version_1_model = tf.keras.models.load_model('saved_model/my_model/1')version_2_model = tf.keras.models.load_model('saved_model/my_model/2')模型优化模型剪枝import tensorflow_model_optimization as tfmot# 定义剪枝模型prune_low_magnitude = tfmot.sparsity.keras.prune_low_magnitude# 应用剪枝model_for_pruning = prune_low_magnitude(model, pruning_params)# 训练剪枝模型model_for_pruning.fit(x_train, y_train, epochs=10)# 导出剪枝后的模型model_for_export = tfmot.sparsity.keras.strip_pruning(model_for_pruning)model_for_export.save('pruned_model')模型量化# 训练后量化converter = tf.lite.TFLiteConverter.from_keras_model(model)converter.optimizations = [tf.lite.Optimize.DEFAULT]quantized_model = converter.convert()# 保存量化模型with open('quantized_model.tflite', 'wb') as f: f.write(quantized_model)知识蒸馏# 定义教师模型和学生模型teacher_model = create_teacher_model()student_model = create_student_model()# 定义蒸馏损失def distillation_loss(y_true, y_pred, teacher_pred, temperature=3): y_true_soft = tf.nn.softmax(y_true / temperature) y_pred_soft = tf.nn.softmax(y_pred / temperature) teacher_pred_soft = tf.nn.softmax(teacher_pred / temperature) loss = tf.keras.losses.KLDivergence()(y_true_soft, y_pred_soft) loss += tf.keras.losses.KLDivergence()(teacher_pred_soft, y_pred_soft) return loss# 训练学生模型for x_batch, y_batch in train_dataset: with tf.GradientTape() as tape: teacher_pred = teacher_model(x_batch, training=False) student_pred = student_model(x_batch, training=True) loss = distillation_loss(y_batch, student_pred, teacher_pred) gradients = tape.gradient(loss, student_model.trainable_variables) optimizer.apply_gradients(zip(gradients, student_model.trainable_variables))最佳实践使用 SavedModel 格式:TensorFlow 2.x 推荐的格式版本控制:为每个模型版本创建单独的目录模型签名:为模型定义清晰的输入输出签名测试部署:在部署前充分测试模型监控性能:监控部署后的模型性能安全考虑:保护模型文件和 API 端点文档记录:记录模型的使用方法和依赖项总结TensorFlow 提供了完整的模型保存、加载和部署解决方案:SavedModel:生产环境推荐格式Keras H5:快速原型开发TensorFlow Lite:移动和嵌入式设备TensorFlow.js:Web 浏览器部署TensorFlow Serving:生产环境服务掌握这些技术将帮助你将深度学习模型从开发环境成功部署到生产环境。
服务端阅读 02月18日 17:58

TensorFlow 中的数据预处理有哪些方法,如何高效加载和处理数据

数据预处理是深度学习流程中至关重要的一步,TensorFlow 提供了强大的数据预处理和加载工具。数据加载方法1. 从 NumPy 数组加载import numpy as npimport tensorflow as tf# 创建 NumPy 数组x_train = np.random.rand(1000, 28, 28, 1).astype(np.float32)y_train = np.random.randint(0, 10, size=(1000,))# 创建 Datasetdataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))# 打印数据形状for x, y in dataset.take(1): print("X shape:", x.shape) print("Y shape:", y.shape)2. 从文件加载从 CSV 文件加载# 从 CSV 文件创建 Datasetcsv_dataset = tf.data.experimental.make_csv_dataset( 'data.csv', batch_size=32, label_name='label', num_epochs=1, ignore_errors=True)# 或者使用 TextLineDatasetdef parse_csv(line): # 解析 CSV 行 parsed_line = tf.io.decode_csv(line, record_defaults=[0.0, 0.0, 0.0, 0]) features = parsed_line[:-1] label = parsed_line[-1] return features, labelcsv_dataset = tf.data.TextLineDataset('data.csv').skip(1).map(parse_csv)从图像文件加载# 从图像文件创建 Datasetimage_paths = tf.data.Dataset.list_files('images/*.jpg')def load_image(path): # 读取图像 image = tf.io.read_file(path) # 解码图像 image = tf.image.decode_jpeg(image, channels=3) # 调整大小 image = tf.image.resize(image, [224, 224]) # 归一化 image = image / 255.0 return imageimage_dataset = image_paths.map(load_image)从 TFRecord 文件加载# 从 TFRecord 文件创建 Datasettfrecord_dataset = tf.data.TFRecordDataset('data.tfrecord')def parse_tfrecord(example_proto): # 定义特征解析 feature_description = { 'image': tf.io.FixedLenFeature([], tf.string), 'label': tf.io.FixedLenFeature([], tf.int64), } # 解析示例 example = tf.io.parse_single_example(example_proto, feature_description) # 解码图像 image = tf.io.decode_jpeg(example['image'], channels=3) image = tf.image.resize(image, [224, 224]) image = image / 255.0 return image, example['label']tfrecord_dataset = tfrecord_dataset.map(parse_tfrecord)3. 从 Pandas DataFrame 加载import pandas as pd# 创建 DataFramedf = pd.DataFrame({ 'feature1': np.random.rand(1000), 'feature2': np.random.rand(1000), 'label': np.random.randint(0, 2, size=1000)})# 从 DataFrame 创建 Datasetdataset = tf.data.Dataset.from_tensor_slices(( df[['feature1', 'feature2']].values, df['label'].values))数据预处理方法1. 图像预处理# 图像数据增强def augment_image(image, label): # 随机翻转 image = tf.image.random_flip_left_right(image) image = tf.image.random_flip_up_down(image) # 随机旋转 image = tf.image.rot90(image, k=tf.random.uniform(shape=[], minval=0, maxval=4, dtype=tf.int32)) # 随机亮度调整 image = tf.image.random_brightness(image, max_delta=0.2) # 随机对比度调整 image = tf.image.random_contrast(image, lower=0.8, upper=1.2) # 随机饱和度调整 image = tf.image.random_saturation(image, lower=0.8, upper=1.2) # 随机裁剪 image = tf.image.random_crop(image, size=[200, 200, 3]) image = tf.image.resize(image, [224, 224]) return image, label# 应用数据增强augmented_dataset = dataset.map(augment_image)2. 文本预处理# 文本预处理import tensorflow_text as text# 文本标准化def normalize_text(text): # 转换为小写 text = tf.strings.lower(text) # 去除标点符号 text = tf.strings.regex_replace(text, r'[^\w\s]', '') # 去除多余空格 text = tf.strings.strip(text) return text# 文本分词def tokenize_text(text): # 使用 Unicode 分词器 tokenizer = text.UnicodeScriptTokenizer() tokens = tokenizer.tokenize(text) return tokens# 构建词汇表def build_vocabulary(dataset, vocab_size=10000): # 统计词频 vocab = collections.Counter() for text in dataset: tokens = tokenize_text(normalize_text(text)) vocab.update(tokens.numpy()) # 选择最常见的词 most_common = vocab.most_common(vocab_size) vocab_list = [word for word, _ in most_common] # 添加特殊标记 vocab_list = ['<PAD>', '<UNK>', '<START>', '<END>'] + vocab_list return vocab_list# 文本编码def encode_text(text, vocab, max_length=100): # 分词 tokens = tokenize_text(normalize_text(text)) # 转换为索引 indices = [vocab.get(token, vocab['<UNK>']) for token in tokens] # 截断或填充 if len(indices) > max_length: indices = indices[:max_length] else: indices = indices + [vocab['<PAD>']] * (max_length - len(indices)) return tf.constant(indices)3. 数值数据预处理# 数值数据标准化def normalize_features(features): # 计算均值和标准差 mean = tf.reduce_mean(features, axis=0) std = tf.math.reduce_std(features, axis=0) # 标准化 normalized = (features - mean) / (std + 1e-7) return normalized# 数值数据归一化def min_max_normalize(features): # 计算最小值和最大值 min_val = tf.reduce_min(features, axis=0) max_val = tf.reduce_max(features, axis=0) # 归一化到 [0, 1] normalized = (features - min_val) / (max_val - min_val + 1e-7) return normalized# 数值数据标准化(使用预计算的统计量)class StandardScaler: def __init__(self): self.mean = None self.std = None def fit(self, data): self.mean = tf.reduce_mean(data, axis=0) self.std = tf.math.reduce_std(data, axis=0) def transform(self, data): return (data - self.mean) / (self.std + 1e-7) def fit_transform(self, data): self.fit(data) return self.transform(data)4. 类别编码# One-Hot 编码def one_hot_encode(labels, num_classes): return tf.one_hot(labels, num_classes)# 标签编码def label_encode(labels, label_map): return tf.map_fn(lambda x: label_map[x.numpy()], labels, dtype=tf.int32)# 构建标签映射def build_label_map(labels): unique_labels = tf.unique(labels).y label_map = {label: idx for idx, label in enumerate(unique_labels.numpy())} return label_mapDataset 操作1. 批处理# 批处理batched_dataset = dataset.batch(32)# 带填充的批处理(用于变长序列)padded_batch_dataset = dataset.padded_batch( batch_size=32, padded_shapes=([None], []), # 特征和标签的填充形状 padding_values=(0.0, 0) # 填充值)2. 打乱数据# 打乱数据shuffled_dataset = dataset.shuffle(buffer_size=1000)# 打乱并批处理shuffled_batched_dataset = dataset.shuffle(buffer_size=1000).batch(32)3. 重复数据# 重复数据repeated_dataset = dataset.repeat(count=2) # 重复 2 次# 无限重复infinite_dataset = dataset.repeat()4. 映射操作# 应用函数到每个元素mapped_dataset = dataset.map(lambda x, y: (x * 2, y))# 并行映射parallel_mapped_dataset = dataset.map( lambda x, y: (x * 2, y), num_parallel_calls=tf.data.AUTOTUNE)5. 过滤数据# 过滤数据filtered_dataset = dataset.filter(lambda x, y: y > 5)# 过滤并映射filtered_mapped_dataset = dataset.filter( lambda x, y: y > 5).map(lambda x, y: (x, y - 5))6. 取数据# 取前 N 个元素taken_dataset = dataset.take(100)# 跳过前 N 个元素skipped_dataset = dataset.skip(100)# 取前 N 个并跳过前 M 个taken_skipped_dataset = dataset.skip(100).take(50)7. 预取数据# 预取数据(提高性能)prefetched_dataset = dataset.prefetch(tf.data.AUTOTUNE)# 完整的数据管道optimized_dataset = ( dataset .shuffle(buffer_size=1000) .map(preprocess, num_parallel_calls=tf.data.AUTOTUNE) .batch(32) .prefetch(tf.data.AUTOTUNE))高效数据加载技巧1. 使用缓存# 缓存数据(适合小数据集)cached_dataset = dataset.cache()# 缓存到文件file_cached_dataset = dataset.cache('cache_dir')2. 并行处理# 并行映射parallel_dataset = dataset.map( preprocess, num_parallel_calls=tf.data.AUTOTUNE)# 并行读取parallel_read_dataset = tf.data.Dataset.list_files( 'images/*.jpg', shuffle=False).interleave( tf.data.TFRecordDataset, cycle_length=4, num_parallel_calls=tf.data.AUTOTUNE)3. 数据压缩# 压缩数据(减少 I/O)compressed_dataset = dataset.interleave( tf.data.TFRecordDataset, cycle_length=4, num_parallel_calls=tf.data.AUTOTUNE)4. 使用生成器# 从 Python 生成器创建 Datasetdef data_generator(): for i in range(1000): yield np.random.rand(28, 28, 1), np.random.randint(0, 10)generator_dataset = tf.data.Dataset.from_generator( data_generator, output_signature=( tf.TensorSpec(shape=(28, 28, 1), dtype=tf.float32), tf.TensorSpec(shape=(), dtype=tf.int64) ))完整的数据预处理流程import tensorflow as tfimport numpy as np# 1. 加载数据def load_data(): # 创建模拟数据 x_train = np.random.rand(1000, 28, 28, 1).astype(np.float32) y_train = np.random.randint(0, 10, size=(1000,)) # 创建 Dataset dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)) return dataset# 2. 数据预处理def preprocess(image, label): # 归一化 image = image / 255.0 # 数据增强(仅训练时) if tf.random.uniform(()) > 0.5: image = tf.image.random_flip_left_right(image) image = tf.image.random_brightness(image, max_delta=0.1) return image, label# 3. 创建数据管道def create_dataset(dataset, batch_size=32, shuffle=True, augment=True): # 打乱数据 if shuffle: dataset = dataset.shuffle(buffer_size=1000) # 应用预处理 dataset = dataset.map( preprocess, num_parallel_calls=tf.data.AUTOTUNE ) # 批处理 dataset = dataset.batch(batch_size) # 预取 dataset = dataset.prefetch(tf.data.AUTOTUNE) return dataset# 4. 使用数据管道# 加载数据train_dataset = load_data()# 创建训练数据集train_dataset = create_dataset(train_dataset, batch_size=32, shuffle=True, augment=True)# 创建验证数据集val_dataset = create_dataset(train_dataset.take(200), batch_size=32, shuffle=False, augment=False)# 训练模型model.fit(train_dataset, epochs=10, validation_data=val_dataset)数据预处理最佳实践1. 数据管道优化# 优化的数据管道optimized_pipeline = ( dataset .cache() # 缓存数据 .shuffle(buffer_size=10000) # 打乱数据 .map(preprocess, num_parallel_calls=tf.data.AUTOTUNE) # 并行预处理 .batch(32) # 批处理 .prefetch(tf.data.AUTOTUNE) # 预取数据)2. 内存管理# 使用生成器减少内存使用def lazy_load_data(): for file_path in file_paths: data = load_file(file_path) yield datalazy_dataset = tf.data.Dataset.from_generator( lazy_load_data, output_signature=...)3. 数据验证# 验证数据def validate_data(dataset): for x, y in dataset.take(1): print(f"X shape: {x.shape}, dtype: {x.dtype}") print(f"Y shape: {y.shape}, dtype: {y.dtype}") # 检查数值范围 print(f"X range: [{tf.reduce_min(x):.2f}, {tf.reduce_max(x):.2f}]") # 检查 NaN 或 Inf if tf.reduce_any(tf.math.is_nan(x)): print("Warning: NaN detected in X!") if tf.reduce_any(tf.math.is_inf(x)): print("Warning: Inf detected in X!")# 使用验证validate_data(train_dataset)4. 数据可视化import matplotlib.pyplot as plt# 可视化数据def visualize_data(dataset, num_samples=5): fig, axes = plt.subplots(1, num_samples, figsize=(15, 3)) for i, (x, y) in enumerate(dataset.take(num_samples)): axes[i].imshow(x.numpy().squeeze(), cmap='gray') axes[i].set_title(f'Label: {y.numpy()}') axes[i].axis('off') plt.tight_layout() plt.show()# 使用可视化visualize_data(train_dataset)总结TensorFlow 提供了强大的数据预处理和加载工具:数据加载:支持多种数据源(NumPy、文件、TFRecord 等)数据预处理:图像、文本、数值数据的预处理方法Dataset 操作:批处理、打乱、映射、过滤等操作高效加载:缓存、并行处理、预取等优化技巧最佳实践:数据管道优化、内存管理、数据验证掌握这些数据预处理技术将帮助你更高效地构建和训练深度学习模型。
服务端阅读 02月18日 17:58

TensorFlow 中的评估指标有哪些,如何自定义评估指标

评估指标(Metrics)用于评估模型性能,是深度学习模型开发和调优的重要工具。常用评估指标1. 分类指标准确率(Accuracy)from tensorflow.keras.metrics import Accuracy# 使用准确率指标accuracy = Accuracy()# 计算准确率y_true = tf.constant([0, 1, 1, 0, 1])y_pred = tf.constant([0, 1, 0, 0, 1])accuracy.update_state(y_true, y_pred)result = accuracy.result()print(result) # 0.8# 在模型编译中使用model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])特点:直观易懂适合平衡数据集对类别不平衡敏感适用场景:平衡的分类任务需要简单评估的场景精确率(Precision)from tensorflow.keras.metrics import Precision# 使用精确率指标precision = Precision()# 计算精确率y_true = tf.constant([0, 1, 1, 0, 1])y_pred = tf.constant([0, 1, 0, 0, 1])precision.update_state(y_true, y_pred)result = precision.result()print(result) # 1.0# 在模型编译中使用model.compile(optimizer='adam', loss='binary_crossentropy', metrics=[Precision()])特点:衡量预测为正类的准确性适合关注假阳性的场景对类别不平衡不敏感适用场景:垃圾邮件检测医疗诊断需要减少假阳性的场景召回率(Recall)from tensorflow.keras.metrics import Recall# 使用召回率指标recall = Recall()# 计算召回率y_true = tf.constant([0, 1, 1, 0, 1])y_pred = tf.constant([0, 1, 0, 0, 1])recall.update_state(y_true, y_pred)result = recall.result()print(result) # 0.666...# 在模型编译中使用model.compile(optimizer='adam', loss='binary_crossentropy', metrics=[Recall()])特点:衡量正类样本的识别能力适合关注假阴性的场景对类别不平衡不敏感适用场景:疾病筛查异常检测需要减少假阴性的场景F1 分数(F1 Score)from tensorflow.keras.metrics import F1Score# 使用 F1 分数指标f1 = F1Score(num_classes=2, threshold=0.5)# 计算 F1 分数y_true = tf.constant([[0, 1], [1, 0], [1, 0], [0, 1], [1, 0]])y_pred = tf.constant([[0.1, 0.9], [0.8, 0.2], [0.3, 0.7], [0.2, 0.8], [0.9, 0.1]])f1.update_state(y_true, y_pred)result = f1.result()print(result) # [0.666..., 0.8]# 在模型编译中使用model.compile(optimizer='adam', loss='binary_crossentropy', metrics=[F1Score(num_classes=2)])特点:精确率和召回率的调和平均平衡精确率和召回率适合不平衡数据集适用场景:不平衡分类任务需要平衡精确率和召回率的场景AUC-ROCfrom tensorflow.keras.metrics import AUC# 使用 AUC 指标auc = AUC()# 计算 AUCy_true = tf.constant([0, 1, 1, 0, 1])y_pred = tf.constant([0.1, 0.9, 0.8, 0.2, 0.7])auc.update_state(y_true, y_pred)result = auc.result()print(result) # 0.916...# 在模型编译中使用model.compile(optimizer='adam', loss='binary_crossentropy', metrics=[AUC()])特点:衡量分类器的整体性能不受阈值影响适合二分类问题适用场景:二分类任务需要评估整体性能的场景2. 回归指标均方误差(MSE)from tensorflow.keras.metrics import MeanSquaredError# 使用 MSE 指标mse = MeanSquaredError()# 计算 MSEy_true = tf.constant([1.0, 2.0, 3.0, 4.0])y_pred = tf.constant([1.1, 2.2, 2.9, 4.1])mse.update_state(y_true, y_pred)result = mse.result()print(result) # 0.0175# 在模型编译中使用model.compile(optimizer='adam', loss='mse', metrics=[MeanSquaredError()])特点:衡量预测值与真实值的差异对异常值敏感适合连续值预测适用场景:回归任务需要精确预测的场景平均绝对误差(MAE)from tensorflow.keras.metrics import MeanAbsoluteError# 使用 MAE 指标mae = MeanAbsoluteError()# 计算 MAEy_true = tf.constant([1.0, 2.0, 3.0, 4.0])y_pred = tf.constant([1.1, 2.2, 2.9, 4.1])mae.update_state(y_true, y_pred)result = mae.result()print(result) # 0.125# 在模型编译中使用model.compile(optimizer='adam', loss='mae', metrics=[MeanAbsoluteError()])特点:衡量预测值与真实值的绝对差异对异常值不敏感适合有异常值的回归任务适用场景:回归任务有异常值的数据平均绝对百分比误差(MAPE)# 自定义 MAPE 指标def mean_absolute_percentage_error(y_true, y_pred): y_true = tf.cast(y_true, tf.float32) y_pred = tf.cast(y_pred, tf.float32) diff = tf.abs((y_true - y_pred) / y_true) return 100.0 * tf.reduce_mean(diff)# 使用 MAPEy_true = tf.constant([100.0, 200.0, 300.0])y_pred = tf.constant([110.0, 190.0, 310.0])mape = mean_absolute_percentage_error(y_true, y_pred)print(mape) # 5.555...特点:衡量预测值的百分比误差直观易懂对接近零的值敏感适用场景:需要百分比误差的场景时间序列预测R² 分数(R-squared)# 自定义 R² 指标def r_squared(y_true, y_pred): y_true = tf.cast(y_true, tf.float32) y_pred = tf.cast(y_pred, tf.float32) ss_res = tf.reduce_sum(tf.square(y_true - y_pred)) ss_tot = tf.reduce_sum(tf.square(y_true - tf.reduce_mean(y_true))) return 1 - ss_res / (ss_tot + tf.keras.backend.epsilon())# 使用 R²y_true = tf.constant([1.0, 2.0, 3.0, 4.0])y_pred = tf.constant([1.1, 2.2, 2.9, 4.1])r2 = r_squared(y_true, y_pred)print(r2) # 0.982...特点:衡量模型解释的方差比例范围在 (-∞, 1] 之间1 表示完美拟合适用场景:回归任务需要评估模型解释能力的场景3. 其他指标Top-K 准确率from tensorflow.keras.metrics import TopKCategoricalAccuracy# 使用 Top-5 准确率top5_acc = TopKCategoricalAccuracy(k=5)# 计算 Top-5 准确率y_true = tf.constant([[0, 0, 1, 0, 0, 0, 0, 0, 0, 0]])y_pred = tf.constant([[0.1, 0.2, 0.3, 0.1, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05]])top5_acc.update_state(y_true, y_pred)result = top5_acc.result()print(result) # 1.0# 在模型编译中使用model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=[TopKCategoricalAccuracy(k=5)])特点:衡量预测是否在前 K 个最高概率中适合多分类任务常用于图像分类适用场景:大规模多分类任务图像分类推荐系统混淆矩阵from sklearn.metrics import confusion_matriximport numpy as np# 计算混淆矩阵y_true = np.array([0, 1, 1, 0, 1, 0, 1, 0])y_pred = np.array([0, 1, 0, 0, 1, 1, 1, 0])cm = confusion_matrix(y_true, y_pred)print(cm)# [[2 1]# [1 4]]# 可视化混淆矩阵import matplotlib.pyplot as pltimport seaborn as snsplt.figure(figsize=(8, 6))sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')plt.xlabel('Predicted')plt.ylabel('True')plt.title('Confusion Matrix')plt.show()特点:详细展示分类结果适合多分类任务可视化分类性能适用场景:多分类任务需要详细分析分类结果的场景自定义评估指标1. 基本自定义指标# 定义自定义指标def custom_metric(y_true, y_pred): # 计算自定义指标 return tf.reduce_mean(tf.abs(y_true - y_pred))# 使用自定义指标model.compile(optimizer='adam', loss='mse', metrics=[custom_metric])2. 类形式的自定义指标# 定义类形式的自定义指标class CustomMetric(tf.keras.metrics.Metric): def __init__(self, name='custom_metric', **kwargs): super(CustomMetric, self).__init__(name=name, **kwargs) self.count = self.add_weight(name='count', initializer='zeros') self.total = self.add_weight(name='total', initializer='zeros') def update_state(self, y_true, y_pred, sample_weight=None): # 更新状态 diff = tf.abs(y_true - y_pred) if sample_weight is not None: diff = diff * sample_weight self.count.assign_add(tf.reduce_sum(tf.cast(diff > 0.5, tf.float32))) self.total.assign_add(tf.cast(tf.size(diff), tf.float32)) def result(self): # 计算结果 return self.count / self.total def reset_states(self): # 重置状态 self.count.assign(0.0) self.total.assign(0.0)# 使用自定义指标custom_metric = CustomMetric()model.compile(optimizer='adam', loss='mse', metrics=[custom_metric])3. 多标签分类指标# 定义多标签准确率def multilabel_accuracy(y_true, y_pred): # 将概率转换为二进制 y_pred_binary = tf.cast(y_pred > 0.5, tf.float32) # 计算每个样本的准确率 sample_accuracy = tf.reduce_all( tf.equal(y_true, y_pred_binary), axis=1 ) # 计算整体准确率 return tf.reduce_mean(tf.cast(sample_accuracy, tf.float32))# 使用多标签准确率model.compile(optimizer='adam', loss='binary_crossentropy', metrics=[multilabel_accuracy])4. IoU(交并比)# 定义 IoU 指标class IoU(tf.keras.metrics.Metric): def __init__(self, num_classes, name='iou', **kwargs): super(IoU, self).__init__(name=name, **kwargs) self.num_classes = num_classes self.intersection = self.add_weight( name='intersection', shape=(num_classes,), initializer='zeros' ) self.union = self.add_weight( name='union', shape=(num_classes,), initializer='zeros' ) def update_state(self, y_true, y_pred, sample_weight=None): # 将预测转换为类别索引 y_pred = tf.argmax(y_pred, axis=-1) y_true = tf.argmax(y_true, axis=-1) # 计算每个类别的交并比 for i in range(self.num_classes): true_mask = tf.cast(y_true == i, tf.float32) pred_mask = tf.cast(y_pred == i, tf.float32) intersection = tf.reduce_sum(true_mask * pred_mask) union = tf.reduce_sum(true_mask + pred_mask) - intersection self.intersection[i].assign_add(intersection) self.union[i].assign_add(union) def result(self): # 计算 IoU return self.intersection / (self.union + tf.keras.backend.epsilon()) def reset_states(self): # 重置状态 self.intersection.assign(tf.zeros_like(self.intersection)) self.union.assign(tf.zeros_like(self.union))# 使用 IoU 指标iou = IoU(num_classes=10)model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=[iou])5. Dice 系数# 定义 Dice 系数指标class DiceCoefficient(tf.keras.metrics.Metric): def __init__(self, name='dice_coefficient', **kwargs): super(DiceCoefficient, self).__init__(name=name, **kwargs) self.intersection = self.add_weight(name='intersection', initializer='zeros') self.total = self.add_weight(name='total', initializer='zeros') def update_state(self, y_true, y_pred, sample_weight=None): # 将预测转换为二进制 y_pred_binary = tf.cast(y_pred > 0.5, tf.float32) # 计算交集和并集 intersection = tf.reduce_sum(y_true * y_pred_binary) total = tf.reduce_sum(y_true) + tf.reduce_sum(y_pred_binary) self.intersection.assign_add(intersection) self.total.assign_add(total) def result(self): # 计算 Dice 系数 return 2.0 * self.intersection / (self.total + tf.keras.backend.epsilon()) def reset_states(self): # 重置状态 self.intersection.assign(0.0) self.total.assign(0.0)# 使用 Dice 系数指标dice = DiceCoefficient()model.compile(optimizer='adam', loss='binary_crossentropy', metrics=[dice])评估指标组合使用1. 多指标评估# 组合多个评估指标model.compile( optimizer='adam', loss='categorical_crossentropy', metrics=[ 'accuracy', Precision(name='precision'), Recall(name='recall'), F1Score(num_classes=10, name='f1_score'), TopKCategoricalAccuracy(k=5, name='top5_accuracy') ])2. 条件指标# 定义条件指标class ConditionalAccuracy(tf.keras.metrics.Metric): def __init__(self, condition_fn, name='conditional_accuracy', **kwargs): super(ConditionalAccuracy, self).__init__(name=name, **kwargs) self.condition_fn = condition_fn self.correct = self.add_weight(name='correct', initializer='zeros') self.total = self.add_weight(name='total', initializer='zeros') def update_state(self, y_true, y_pred, sample_weight=None): # 应用条件函数 mask = self.condition_fn(y_true, y_pred) # 计算准确率 y_pred_class = tf.argmax(y_pred, axis=-1) y_true_class = tf.argmax(y_true, axis=-1) correct = tf.cast(tf.equal(y_pred_class, y_true_class), tf.float32) correct = correct * tf.cast(mask, tf.float32) self.correct.assign_add(tf.reduce_sum(correct)) self.total.assign_add(tf.reduce_sum(tf.cast(mask, tf.float32))) def result(self): return self.correct / (self.total + tf.keras.backend.epsilon()) def reset_states(self): self.correct.assign(0.0) self.total.assign(0.0)# 使用条件指标(例如只计算正类的准确率)positive_condition = lambda y_true, y_pred: tf.reduce_any(y_true > 0.5, axis=-1)positive_accuracy = ConditionalAccuracy(positive_condition, name='positive_accuracy')model.compile( optimizer='adam', loss='binary_crossentropy', metrics=['accuracy', positive_accuracy])评估指标最佳实践1. 根据任务选择合适的指标# 分类任务model.compile( optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy', 'precision', 'recall', 'f1_score'])# 回归任务model.compile( optimizer='adam', loss='mse', metrics=['mae', 'mse'])# 不平衡分类任务model.compile( optimizer='adam', loss='binary_crossentropy', metrics=['precision', 'recall', 'auc'])2. 使用多个指标全面评估# 组合多个指标model.compile( optimizer='adam', loss='categorical_crossentropy', metrics=[ 'accuracy', Precision(name='precision'), Recall(name='recall'), AUC(name='auc'), TopKCategoricalAccuracy(k=5, name='top5_accuracy') ])3. 监控指标变化# 自定义回调函数监控指标class MetricsMonitor(tf.keras.callbacks.Callback): def on_epoch_end(self, epoch, logs=None): print(f"Epoch {epoch}:") print(f" Accuracy: {logs['accuracy']:.4f}") print(f" Precision: {logs['precision']:.4f}") print(f" Recall: {logs['recall']:.4f}") print(f" AUC: {logs['auc']:.4f}")# 使用监控回调model.fit(x_train, y_train, validation_data=(x_val, y_val), callbacks=[MetricsMonitor()])4. 可视化指标import matplotlib.pyplot as plt# 绘制指标曲线def plot_metrics(history): fig, axes = plt.subplots(2, 2, figsize=(15, 10)) # 准确率 axes[0, 0].plot(history.history['accuracy'], label='Training Accuracy') axes[0, 0].plot(history.history['val_accuracy'], label='Validation Accuracy') axes[0, 0].set_title('Accuracy') axes[0, 0].set_xlabel('Epoch') axes[0, 0].set_ylabel('Accuracy') axes[0, 0].legend() # 精确率 axes[0, 1].plot(history.history['precision'], label='Training Precision') axes[0, 1].plot(history.history['val_precision'], label='Validation Precision') axes[0, 1].set_title('Precision') axes[0, 1].set_xlabel('Epoch') axes[0, 1].set_ylabel('Precision') axes[0, 1].legend() # 召回率 axes[1, 0].plot(history.history['recall'], label='Training Recall') axes[1, 0].plot(history.history['val_recall'], label='Validation Recall') axes[1, 0].set_title('Recall') axes[1, 0].set_xlabel('Epoch') axes[1, 0].set_ylabel('Recall') axes[1, 0].legend() # AUC axes[1, 1].plot(history.history['auc'], label='Training AUC') axes[1, 1].plot(history.history['val_auc'], label='Validation AUC') axes[1, 1].set_title('AUC') axes[1, 1].set_xlabel('Epoch') axes[1, 1].set_ylabel('AUC') axes[1, 1].legend() plt.tight_layout() plt.show()# 使用history = model.fit(x_train, y_train, validation_data=(x_val, y_val), epochs=50)plot_metrics(history)总结TensorFlow 提供了丰富的评估指标:分类指标:Accuracy、Precision、Recall、F1 Score、AUC-ROC回归指标:MSE、MAE、MAPE、R²其他指标:Top-K Accuracy、混淆矩阵、IoU、Dice自定义指标:可以创建自定义评估指标满足特定需求指标组合:可以组合多个指标全面评估模型性能选择合适的评估指标需要考虑任务类型、数据特性和业务需求。通过多个指标的组合使用,可以更全面地评估模型性能。
服务端阅读 02月18日 17:57

TensorFlow 中的 TensorBoard 是什么,如何使用它来监控训练过程

TensorBoard 是 TensorFlow 提供的可视化工具,用于监控和分析机器学习模型的训练过程。它提供了丰富的可视化功能,帮助开发者更好地理解模型性能和调试问题。TensorBoard 概述TensorBoard 是一个基于 Web 的可视化界面,可以实时显示:损失和指标的变化模型架构图权重和偏置的分布嵌入向量的可视化图像和音频数据文本数据性能分析基本使用1. 安装 TensorBoardpip install tensorboard2. 启动 TensorBoard# 基本启动tensorboard --logdir logs/# 指定端口tensorboard --logdir logs/ --port 6006# 在后台运行tensorboard --logdir logs/ --host 0.0.0.0 &3. 访问 TensorBoard在浏览器中打开:http://localhost:6006使用 Keras Callback基本用法import tensorflow as tffrom tensorflow.keras import layers, models, callbacks# 创建 TensorBoard 回调tensorboard_callback = callbacks.TensorBoard( log_dir='logs/fit', histogram_freq=1, write_graph=True, write_images=True, update_freq='epoch')# 构建模型model = models.Sequential([ layers.Dense(64, activation='relu', input_shape=(10,)), layers.Dense(10, activation='softmax')])model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')# 训练模型model.fit( x_train, y_train, epochs=10, validation_data=(x_val, y_val), callbacks=[tensorboard_callback])高级配置import datetime# 创建带时间戳的日志目录log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")tensorboard_callback = callbacks.TensorBoard( log_dir=log_dir, histogram_freq=1, # 记录权重直方图 write_graph=True, # 记录计算图 write_images=True, # 记录权重图像 update_freq='batch', # 每个 batch 更新 profile_batch='500,520', # 性能分析 embeddings_freq=1, # 记录嵌入 embeddings_metadata={'embedding_layer': 'metadata.tsv'})手动记录数据使用 tf.summaryimport tensorflow as tf# 创建摘要写入器log_dir = 'logs/manual'writer = tf.summary.create_file_writer(log_dir)# 记录标量with writer.as_default(): for step in range(100): loss = 1.0 / (step + 1) tf.summary.scalar('loss', loss, step=step) tf.summary.scalar('accuracy', step / 100, step=step)writer.close()记录不同类型的数据import tensorflow as tfimport numpy as nplog_dir = 'logs/various_types'writer = tf.summary.create_file_writer(log_dir)with writer.as_default(): # 记录标量 tf.summary.scalar('learning_rate', 0.001, step=0) # 记录直方图 weights = np.random.normal(0, 1, 1000) tf.summary.histogram('weights', weights, step=0) # 记录图像 image = np.random.randint(0, 255, (28, 28, 3), dtype=np.uint8) tf.summary.image('sample_image', image[np.newaxis, ...], step=0) # 记录文本 tf.summary.text('log_message', 'Training started', step=0) # 记录音频 audio = np.random.randn(16000) # 1秒音频 tf.summary.audio('sample_audio', audio[np.newaxis, ...], sample_rate=16000, step=0)writer.close()自定义训练循环中的记录import tensorflow as tffrom tensorflow.keras import optimizers, losseslog_dir = 'logs/custom_training'writer = tf.summary.create_file_writer(log_dir)model = create_model()optimizer = optimizers.Adam(learning_rate=0.001)loss_fn = losses.SparseCategoricalCrossentropy()@tf.functiondef train_step(x_batch, y_batch, step): with tf.GradientTape() as tape: predictions = model(x_batch, training=True) loss = loss_fn(y_batch, predictions) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) return lossstep = 0for epoch in range(10): for x_batch, y_batch in train_dataset: loss = train_step(x_batch, y_batch, step) # 记录损失 with writer.as_default(): tf.summary.scalar('train_loss', loss, step=step) step += 1 # 记录验证损失 val_loss = model.evaluate(val_dataset, verbose=0) with writer.as_default(): tf.summary.scalar('val_loss', val_loss[0], step=step)writer.close()可视化模型架构import tensorflow as tffrom tensorflow.keras import layers, models# 构建模型model = models.Sequential([ layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)), layers.MaxPooling2D((2, 2)), layers.Conv2D(64, (3, 3), activation='relu'), layers.MaxPooling2D((2, 2)), layers.Flatten(), layers.Dense(128, activation='relu'), layers.Dense(10, activation='softmax')])# 保存模型图log_dir = 'logs/graph'writer = tf.summary.create_file_writer(log_dir)with writer.as_default(): tf.summary.graph(model.get_concrete_function( tf.TensorSpec(shape=(None, 28, 28, 1), dtype=tf.float32) ))writer.close()可视化嵌入向量import tensorflow as tffrom tensorflow.keras import layers, models# 构建带嵌入层的模型model = models.Sequential([ layers.Embedding(input_dim=10000, output_dim=128, input_length=50), layers.GlobalAveragePooling1D(), layers.Dense(64, activation='relu'), layers.Dense(1, activation='sigmoid')])# 创建嵌入投影log_dir = 'logs/embeddings'writer = tf.summary.create_file_writer(log_dir)# 获取嵌入层embedding_layer = model.layers[0]weights = embedding_layer.get_weights()[0]# 创建元数据文件metadata = []for i in range(10000): metadata.append(f'word_{i}')with open('logs/embeddings/metadata.tsv', 'w') as f: f.write('Word\n') for word in metadata: f.write(f'{word}\n')# 记录嵌入with writer.as_default(): from tensorboard.plugins import projector projector.visualize_embeddings(writer, { 'embedding': projector.EmbeddingInfo( weights=weights, metadata='metadata.tsv' ) })writer.close()可视化图像数据import tensorflow as tfimport numpy as nplog_dir = 'logs/images'writer = tf.summary.create_file_writer(log_dir)# 生成示例图像with writer.as_default(): for step in range(10): # 创建随机图像 images = np.random.randint(0, 255, (4, 28, 28, 3), dtype=np.uint8) # 记录图像 tf.summary.image('generated_images', images, step=step, max_outputs=4)writer.close()可视化文本数据import tensorflow as tflog_dir = 'logs/text'writer = tf.summary.create_file_writer(log_dir)with writer.as_default(): # 记录文本 texts = [ 'This is a sample text for visualization.', 'TensorBoard can display text data.', 'Text visualization is useful for NLP tasks.' ] for step, text in enumerate(texts): tf.summary.text(f'sample_text_{step}', text, step=step)writer.close()性能分析使用 TensorBoard Profilerimport tensorflow as tf# 启用性能分析log_dir = 'logs/profiler'writer = tf.summary.create_file_writer(log_dir)# 在训练循环中记录性能tf.profiler.experimental.start(log_dir)# 训练代码for epoch in range(10): for x_batch, y_batch in train_dataset: # 训练步骤 passtf.profiler.experimental.stop()使用 Keras Callback 进行性能分析tensorboard_callback = callbacks.TensorBoard( log_dir='logs/profiler', profile_batch='10,20' # 分析第 10 到 20 个 batch)model.fit( x_train, y_train, epochs=10, callbacks=[tensorboard_callback])多个实验比较import tensorflow as tfimport datetime# 创建不同的实验experiments = [ {'lr': 0.001, 'batch_size': 32}, {'lr': 0.0001, 'batch_size': 64}, {'lr': 0.01, 'batch_size': 16}]for i, exp in enumerate(experiments): # 为每个实验创建独立的日志目录 log_dir = f"logs/experiment_{i}_{datetime.datetime.now().strftime('%Y%m%d-%H%M%S')}" # 创建 TensorBoard 回调 tensorboard_callback = callbacks.TensorBoard(log_dir=log_dir) # 构建和训练模型 model = create_model() model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=exp['lr']), loss='sparse_categorical_crossentropy') model.fit( x_train, y_train, epochs=10, batch_size=exp['batch_size'], callbacks=[tensorboard_callback] )自定义插件创建自定义可视化import tensorflow as tffrom tensorboard.plugins.hparams import api as hp# 定义超参数HP_NUM_UNITS = hp.HParam('num_units', hp.Discrete([16, 32, 64]))HP_DROPOUT = hp.HParam('dropout', hp.RealInterval(0.1, 0.5))HP_OPTIMIZER = hp.HParam('optimizer', hp.Discrete(['adam', 'sgd']))# 记录超参数log_dir = 'logs/hparam_tuning'with tf.summary.create_file_writer(log_dir).as_default(): hp.hparams_config( hparams=[HP_NUM_UNITS, HP_DROPOUT, HP_OPTIMIZER], metrics=[hp.Metric('accuracy', display_name='Accuracy')] )# 运行超参数调优for num_units in HP_NUM_UNITS.domain.values: for dropout in (HP_DROPOUT.domain.min_value, HP_DROPOUT.domain.max_value): for optimizer in HP_OPTIMIZER.domain.values: hparams = { HP_NUM_UNITS: num_units, HP_DROPOUT: dropout, HP_OPTIMIZER: optimizer } # 训练模型 model = create_model(num_units, dropout) model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy') # 记录结果 accuracy = model.evaluate(x_test, y_test)[1] with tf.summary.create_file_writer(log_dir).as_default(): hp.hparams(hparams, trial_id=f'{num_units}_{dropout}_{optimizer}') tf.summary.scalar('accuracy', accuracy, step=1)最佳实践使用时间戳:为每次运行创建唯一的日志目录定期记录:不要过于频繁地记录数据,影响性能清理旧日志:定期清理不需要的日志文件使用子目录:为不同类型的指标使用不同的子目录记录超参数:使用 hparams 插件记录超参数监控资源使用:使用性能分析器监控 GPU/CPU 使用情况常见问题1. TensorBoard 无法启动# 检查端口是否被占用lsof -i :6006# 使用不同的端口tensorboard --logdir logs/ --port 60072. 数据不显示# 确保正确关闭 writerwriter.close()# 或者使用上下文管理器with writer.as_default(): tf.summary.scalar('loss', loss, step=step)3. 内存不足# 减少记录频率tensorboard_callback = callbacks.TensorBoard( update_freq='epoch' # 每个 epoch 更新一次)# 或者减少记录的数据量tensorboard_callback = callbacks.TensorBoard( histogram_freq=0, # 不记录直方图 write_images=False # 不记录图像)总结TensorBoard 是 TensorFlow 中强大的可视化工具:实时监控:实时查看训练过程多种可视化:支持标量、图像、文本、音频等多种数据类型性能分析:分析模型性能瓶颈实验比较:比较不同实验的结果易于使用:简单的 API 和直观的界面掌握 TensorBoard 将帮助你更好地理解和优化你的深度学习模型。
服务端阅读 02月18日 17:56

TensorFlow 中的 tf.data API 是什么,如何高效地加载和预处理数据

tf.data API 是 TensorFlow 提供的用于构建高效数据管道的工具集。它能够帮助你快速加载、转换和处理大规模数据集,是深度学习项目中不可或缺的部分。tf.data API 的核心概念Dataset 对象tf.data.Dataset 是 tf.data API 的核心抽象,表示一个元素序列。每个元素包含一个或多个张量。基本操作流程创建数据源:从内存、文件或生成器创建 Dataset转换数据:应用各种转换操作迭代数据:在训练循环中迭代 Dataset创建 Dataset1. 从 NumPy 数组创建import tensorflow as tfimport numpy as np# 准备数据features = np.random.random((1000, 10))labels = np.random.randint(0, 2, size=(1000,))# 创建 Datasetdataset = tf.data.Dataset.from_tensor_slices((features, labels))print(dataset)2. 从 Python 生成器创建def data_generator(): for i in range(100): yield np.random.random((10,)), np.random.randint(0, 2)dataset = tf.data.Dataset.from_generator( data_generator, output_signature=( tf.TensorSpec(shape=(10,), dtype=tf.float32), tf.TensorSpec(shape=(), dtype=tf.int32) ))3. 从 CSV 文件创建import pandas as pd# 读取 CSV 文件df = pd.read_csv('data.csv')# 转换为 Datasetdataset = tf.data.Dataset.from_tensor_slices(( df[['feature1', 'feature2', 'feature3']].values, df['label'].values))4. 从 TFRecord 文件创建# 创建 TFRecord 文件def _bytes_feature(value): return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))def _float_feature(value): return tf.train.Feature(float_list=tf.train.FloatList(value=value))def create_tfrecord(filename, data): with tf.io.TFRecordWriter(filename) as writer: for features, label in data: feature = { 'features': _float_feature(features), 'label': _bytes_feature(str(label).encode()) } example = tf.train.Example(features=tf.train.Features(feature=feature)) writer.write(example.SerializeToString())# 读取 TFRecord 文件def parse_tfrecord(example_proto): feature_description = { 'features': tf.io.FixedLenFeature([10], tf.float32), 'label': tf.io.FixedLenFeature([], tf.string) } example = tf.io.parse_single_example(example_proto, feature_description) features = example['features'] label = tf.strings.to_number(example['label'], out_type=tf.int32) return features, labeldataset = tf.data.TFRecordDataset('data.tfrecord')dataset = dataset.map(parse_tfrecord)5. 从图像文件创建import pathlib# 获取图像文件路径image_dir = pathlib.Path('images/')image_paths = list(image_dir.glob('*.jpg'))# 创建 Datasetdataset = tf.data.Dataset.from_tensor_slices([str(path) for path in image_paths])def load_image(image_path): image = tf.io.read_file(image_path) image = tf.image.decode_jpeg(image, channels=3) image = tf.image.resize(image, [224, 224]) image = image / 255.0 return imagedataset = dataset.map(load_image)数据转换操作1. map - 应用函数到每个元素def preprocess(features, label): # 归一化 features = tf.cast(features, tf.float32) / 255.0 # 添加噪声 features = features + tf.random.normal(tf.shape(features), 0, 0.01) return features, labeldataset = dataset.map(preprocess)2. batch - 批处理# 创建批次dataset = dataset.batch(32)3. shuffle - 打乱数据# 打乱数据dataset = dataset.shuffle(buffer_size=1000)4. repeat - 重复数据集# 无限重复dataset = dataset.repeat()# 重复指定次数dataset = dataset.repeat(epochs)5. prefetch - 预取数据# 预取数据以提高性能dataset = dataset.prefetch(tf.data.AUTOTUNE)6. filter - 过滤数据# 过滤特定条件的数据dataset = dataset.filter(lambda x, y: y > 0)7. take - 获取前 N 个元素# 获取前 100 个元素dataset = dataset.take(100)8. skip - 跳过前 N 个元素# 跳过前 100 个元素dataset = dataset.skip(100)9. cache - 缓存数据集# 缓存到内存dataset = dataset.cache()# 缓存到文件dataset = dataset.cache('cache.tfdata')完整的数据管道示例图像分类数据管道import tensorflow as tfimport pathlibdef create_image_dataset(image_dir, batch_size=32, image_size=(224, 224)): # 获取图像路径和标签 image_dir = pathlib.Path(image_dir) all_image_paths = [str(path) for path in image_dir.glob('*/*.jpg')] # 提取标签 label_names = sorted(item.name for item in image_dir.glob('*/') if item.is_dir()) label_to_index = dict((name, index) for index, name in enumerate(label_names)) all_image_labels = [label_to_index[pathlib.Path(path).parent.name] for path in all_image_paths] # 创建 Dataset dataset = tf.data.Dataset.from_tensor_slices((all_image_paths, all_image_labels)) # 打乱数据 dataset = dataset.shuffle(buffer_size=len(all_image_paths)) # 加载和预处理图像 def load_and_preprocess_image(path, label): image = tf.io.read_file(path) image = tf.image.decode_jpeg(image, channels=3) image = tf.image.resize(image, image_size) image = tf.image.random_flip_left_right(image) image = tf.image.random_brightness(image, max_delta=0.2) image = image / 255.0 return image, label dataset = dataset.map(load_and_preprocess_image, num_parallel_calls=tf.data.AUTOTUNE) # 批处理和预取 dataset = dataset.batch(batch_size) dataset = dataset.prefetch(tf.data.AUTOTUNE) return dataset# 使用数据集train_dataset = create_image_dataset('train/', batch_size=32)val_dataset = create_image_dataset('val/', batch_size=32)文本分类数据管道import tensorflow as tfdef create_text_dataset(texts, labels, batch_size=32, max_length=100): # 创建 Dataset dataset = tf.data.Dataset.from_tensor_slices((texts, labels)) # 文本预处理 def preprocess_text(text, label): # 转换为小写 text = tf.strings.lower(text) # 分词 words = tf.strings.split(text) # 截断或填充 words = words[:max_length] # 转换为索引 vocab = {'<pad>': 0, '<unk>': 1} indices = [vocab.get(word, vocab['<unk>']) for word in words.numpy()] # 填充 indices = indices + [vocab['<pad>']] * (max_length - len(indices)) return tf.cast(indices, tf.int32), label dataset = dataset.map(preprocess_text, num_parallel_calls=tf.data.AUTOTUNE) # 打乱、批处理、预取 dataset = dataset.shuffle(buffer_size=1000) dataset = dataset.batch(batch_size) dataset = dataset.prefetch(tf.data.AUTOTUNE) return dataset性能优化技巧1. 并行处理# 使用 num_parallel_calls 参数并行执行 map 操作dataset = dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)2. 缓存# 缓存预处理后的数据dataset = dataset.cache()3. 预取# 预取数据以减少等待时间dataset = dataset.prefetch(tf.data.AUTOTUNE)4. 向量化操作# 使用向量化操作而非循环def vectorized_preprocess(features, labels): features = tf.cast(features, tf.float32) / 255.0 return features, labelsdataset = dataset.map(vectorized_preprocess)5. 减少内存复制# 使用 tf.data.Dataset.from_generator 避免复制大型数组def data_generator(): for i in range(100): yield np.random.random((10,)), np.random.randint(0, 2)dataset = tf.data.Dataset.from_generator( data_generator, output_signature=( tf.TensorSpec(shape=(10,), dtype=tf.float32), tf.TensorSpec(shape=(), dtype=tf.int32) ))与模型训练集成使用 fit 方法import tensorflow as tffrom tensorflow.keras import layers, models# 创建数据集train_dataset = create_image_dataset('train/', batch_size=32)val_dataset = create_image_dataset('val/', batch_size=32)# 构建模型model = models.Sequential([ layers.Conv2D(32, (3, 3), activation='relu', input_shape=(224, 224, 3)), layers.MaxPooling2D((2, 2)), layers.Conv2D(64, (3, 3), activation='relu'), layers.MaxPooling2D((2, 2)), layers.Flatten(), layers.Dense(64, activation='relu'), layers.Dense(10, activation='softmax')])# 编译模型model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])# 训练模型model.fit( train_dataset, epochs=10, validation_data=val_dataset)使用自定义训练循环import tensorflow as tffrom tensorflow.keras import optimizers, losses# 创建数据集train_dataset = create_image_dataset('train/', batch_size=32)# 定义优化器和损失函数optimizer = optimizers.Adam(learning_rate=0.001)loss_fn = losses.SparseCategoricalCrossentropy()# 训练步骤@tf.functiondef train_step(images, labels): with tf.GradientTape() as tape: predictions = model(images, training=True) loss = loss_fn(labels, predictions) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) return loss# 训练循环epochs = 10for epoch in range(epochs): total_loss = 0 for images, labels in train_dataset: loss = train_step(images, labels) total_loss += loss.numpy() avg_loss = total_loss / len(train_dataset) print(f'Epoch {epoch + 1}, Loss: {avg_loss:.4f}')数据增强def augment_image(image, label): # 随机翻转 image = tf.image.random_flip_left_right(image) # 随机旋转 image = tf.image.rot90(image, k=tf.random.uniform(shape=[], minval=0, maxval=4, dtype=tf.int32)) # 随机亮度 image = tf.image.random_brightness(image, max_delta=0.2) # 随机对比度 image = tf.image.random_contrast(image, lower=0.8, upper=1.2) return image, label# 应用数据增强train_dataset = train_dataset.map(augment_image, num_parallel_calls=tf.data.AUTOTUNE)处理不平衡数据# 计算类别权重class_weights = {0: 1.0, 1: 2.0} # 类别 1 的权重更高# 在训练时使用类别权重model.fit( train_dataset, epochs=10, class_weight=class_weights)# 或者使用重采样def resample_dataset(dataset, target_dist): # 实现重采样逻辑 pass监控数据管道性能import timedef benchmark_dataset(dataset, num_epochs=2): start_time = time.time() for epoch in range(num_epochs): for i, (images, labels) in enumerate(dataset): if i % 100 == 0: print(f'Epoch {epoch + 1}, Batch {i}') end_time = time.time() print(f'Total time: {end_time - start_time:.2f} seconds')# 测试数据集性能benchmark_dataset(train_dataset)最佳实践始终使用 prefetch:减少 GPU 等待时间并行化 map 操作:使用 num_parallel_calls=tf.data.AUTOTUNE缓存预处理后的数据:如果数据可以放入内存合理设置 buffer_size:对于 shuffle 操作使用向量化操作:避免 Python 循环监控性能:使用 TensorBoard 或自定义指标监控数据管道性能处理异常:添加适当的错误处理逻辑总结tf.data API 是 TensorFlow 中构建高效数据管道的强大工具:灵活的数据源:支持多种数据格式丰富的转换操作:map、batch、shuffle、filter 等性能优化:并行处理、缓存、预取易于集成:与 Keras API 无缝集成掌握 tf.data API 将帮助你构建高效、可扩展的数据管道,提升模型训练效率。
服务端阅读 02月18日 17:35

如何在 TensorFlow 中构建和训练神经网络模型

在 TensorFlow 中构建和训练神经网络模型是深度学习的核心任务。TensorFlow 提供了多种方式来构建模型,从高级 API 到低级自定义实现。使用 Keras Sequential APISequential API 是最简单的方式,适用于简单的线性堆叠模型:import tensorflow as tffrom tensorflow.keras import layers, models# 创建 Sequential 模型model = models.Sequential([ layers.Dense(128, activation='relu', input_shape=(784,)), layers.Dropout(0.2), layers.Dense(64, activation='relu'), layers.Dropout(0.2), layers.Dense(10, activation='softmax')])# 查看模型结构model.summary()使用 Keras Functional APIFunctional API 提供更灵活的模型构建方式,支持复杂的多输入多输出模型:from tensorflow.keras import layers, models, Input# 定义输入层inputs = Input(shape=(784,))# 构建隐藏层x = layers.Dense(128, activation='relu')(inputs)x = layers.Dropout(0.2)(x)x = layers.Dense(64, activation='relu')(x)x = layers.Dropout(0.2)(x)# 定义输出层outputs = layers.Dense(10, activation='softmax')(x)# 创建模型model = models.Model(inputs=inputs, outputs=outputs)model.summary()自定义模型类对于更复杂的模型,可以继承 tf.keras.Model 类:import tensorflow as tffrom tensorflow.keras import layers, modelsclass CustomModel(models.Model): def __init__(self): super(CustomModel, self).__init__() self.dense1 = layers.Dense(128, activation='relu') self.dropout1 = layers.Dropout(0.2) self.dense2 = layers.Dense(64, activation='relu') self.dropout2 = layers.Dropout(0.2) self.dense3 = layers.Dense(10, activation='softmax') def call(self, inputs, training=False): x = self.dense1(inputs) x = self.dropout1(x, training=training) x = self.dense2(x) x = self.dropout2(x, training=training) return self.dense3(x)# 创建模型实例model = CustomModel()常用层类型1. 全连接层(Dense)layers.Dense(units=64, activation='relu', input_shape=(784,))2. 卷积层(Conv2D)layers.Conv2D(filters=32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1))3. 池化层(MaxPooling2D)layers.MaxPooling2D(pool_size=(2, 2))4. 批归一化层(BatchNormalization)layers.BatchNormalization()5. Dropout 层layers.Dropout(0.5)6. Flatten 层layers.Flatten()7. LSTM 层layers.LSTM(units=64, return_sequences=True)8. 注意力层layers.Attention()激活函数# ReLUlayers.Dense(64, activation='relu')# Sigmoidlayers.Dense(64, activation='sigmoid')# Tanhlayers.Dense(64, activation='tanh')# Softmaxlayers.Dense(10, activation='softmax')# LeakyReLUlayers.LeakyReLU(alpha=0.1)# ELUlayers.Dense(64, activation='elu')# SELUlayers.Dense(64, activation='selu')编译模型在训练之前,需要编译模型,指定优化器、损失函数和评估指标:model.compile( optimizer='adam', # 或使用 tf.keras.optimizers.Adam(learning_rate=0.001) loss='sparse_categorical_crossentropy', # 或使用自定义损失函数 metrics=['accuracy'] # 可以指定多个指标)常用优化器# SGDoptimizer = tf.keras.optimizers.SGD(learning_rate=0.01, momentum=0.9)# Adamoptimizer = tf.keras.optimizers.Adam(learning_rate=0.001)# RMSpropoptimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)# Adagradoptimizer = tf.keras.optimizers.Adagrad(learning_rate=0.01)# Adadeltaoptimizer = tf.keras.optimizers.Adadelta(learning_rate=1.0)常用损失函数# 回归问题loss = 'mse' # 均方误差loss = 'mae' # 平均绝对误差# 二分类问题loss = 'binary_crossentropy'# 多分类问题loss = 'categorical_crossentropy' # one-hot 编码loss = 'sparse_categorical_crossentropy' # 整数标签# 自定义损失函数def custom_loss(y_true, y_pred): return tf.reduce_mean(tf.square(y_true - y_pred))常用评估指标metrics = ['accuracy', 'precision', 'recall']训练模型使用 fit 方法训练import numpy as np# 准备数据x_train = np.random.random((1000, 784))y_train = np.random.randint(0, 10, size=(1000,))x_val = np.random.random((200, 784))y_val = np.random.randint(0, 10, size=(200,))# 训练模型history = model.fit( x_train, y_train, epochs=10, batch_size=32, validation_data=(x_val, y_val), callbacks=[ tf.keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True), tf.keras.callbacks.ModelCheckpoint('best_model.h5', save_best_only=True), tf.keras.callbacks.ReduceLROnPlateau(factor=0.1, patience=2) ])使用 tf.data.Dataset 训练# 创建 Datasettrain_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))train_dataset = train_dataset.shuffle(buffer_size=1000).batch(32).prefetch(tf.data.AUTOTUNE)val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))val_dataset = val_dataset.batch(32)# 训练history = model.fit( train_dataset, epochs=10, validation_data=val_dataset)自定义训练循环对于更复杂的训练逻辑,可以使用自定义训练循环:import tensorflow as tffrom tensorflow.keras import optimizers, losses# 定义优化器和损失函数optimizer = optimizers.Adam(learning_rate=0.001)loss_fn = losses.SparseCategoricalCrossentropy()# 训练步骤@tf.functiondef train_step(x_batch, y_batch): with tf.GradientTape() as tape: predictions = model(x_batch, training=True) loss = loss_fn(y_batch, predictions) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) return loss# 验证步骤@tf.functiondef val_step(x_batch, y_batch): predictions = model(x_batch, training=False) loss = loss_fn(y_batch, predictions) return loss# 训练循环epochs = 10for epoch in range(epochs): print(f'Epoch {epoch + 1}/{epochs}') # 训练 train_loss = 0 for x_batch, y_batch in train_dataset: loss = train_step(x_batch, y_batch) train_loss += loss.numpy() train_loss /= len(train_dataset) # 验证 val_loss = 0 for x_batch, y_batch in val_dataset: loss = val_step(x_batch, y_batch) val_loss += loss.numpy() val_loss /= len(val_dataset) print(f'Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')回调函数(Callbacks)TensorFlow 提供了多种回调函数来控制训练过程:from tensorflow.keras.callbacks import Callbackclass CustomCallback(Callback): def on_train_begin(self, logs=None): print('Starting training...') def on_epoch_end(self, epoch, logs=None): print(f'Epoch {epoch + 1} - Loss: {logs["loss"]:.4f}') def on_batch_end(self, batch, logs=None): if batch % 100 == 0: print(f'Batch {batch} - Loss: {logs["loss"]:.4f}')# 使用回调model.fit( x_train, y_train, epochs=10, callbacks=[CustomCallback()])常用回调函数callbacks = [ # 早停 tf.keras.callbacks.EarlyStopping( monitor='val_loss', patience=5, restore_best_weights=True ), # 模型检查点 tf.keras.callbacks.ModelCheckpoint( 'model_{epoch:02d}.h5', save_best_only=True, monitor='val_loss' ), # 学习率调度 tf.keras.callbacks.ReduceLROnPlateau( monitor='val_loss', factor=0.1, patience=3 ), # TensorBoard tf.keras.callbacks.TensorBoard( log_dir='./logs', histogram_freq=1 ), # 学习率衰减 tf.keras.callbacks.LearningRateScheduler( lambda epoch: 0.001 * (0.9 ** epoch) )]评估模型# 评估模型test_loss, test_acc = model.evaluate(x_test, y_test)print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}')# 预测predictions = model.predict(x_test)predicted_classes = np.argmax(predictions, axis=1)保存和加载模型# 保存整个模型model.save('my_model.h5')# 加载模型loaded_model = tf.keras.models.load_model('my_model.h5')# 只保存权重model.save_weights('model_weights.h5')# 加载权重model.load_weights('model_weights.h5')# 保存为 SavedModel 格式model.save('saved_model/my_model')# 加载 SavedModelloaded_model = tf.keras.models.load_model('saved_model/my_model')完整示例:MNIST 分类import tensorflow as tffrom tensorflow.keras import layers, models# 加载数据(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()# 预处理x_train = x_train.reshape(-1, 784).astype('float32') / 255.0x_test = x_test.reshape(-1, 784).astype('float32') / 255.0# 构建模型model = models.Sequential([ layers.Dense(128, activation='relu', input_shape=(784,)), layers.Dropout(0.2), layers.Dense(64, activation='relu'), layers.Dropout(0.2), layers.Dense(10, activation='softmax')])# 编译模型model.compile( optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])# 训练模型history = model.fit( x_train, y_train, epochs=10, batch_size=128, validation_split=0.2, callbacks=[ tf.keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True) ])# 评估模型test_loss, test_acc = model.evaluate(x_test, y_test)print(f'Test Accuracy: {test_acc:.4f}')性能优化建议使用 GPU 加速:确保 TensorFlow 能够使用 GPU数据预取:使用 tf.data.Dataset.prefetch() 提高数据加载效率混合精度训练:使用 tf.keras.mixed_precision 提高训练速度批归一化:使用 BatchNormalization 加速收敛学习率调度:使用适当的学习率调度策略总结在 TensorFlow 中构建和训练神经网络模型的关键步骤:选择模型构建方式:Sequential API、Functional API 或自定义模型类设计网络架构:选择合适的层和激活函数编译模型:指定优化器、损失函数和评估指标训练模型:使用 fit() 方法或自定义训练循环监控训练过程:使用回调函数和 TensorBoard评估和优化:评估模型性能并进行调优掌握这些技能将帮助你有效地构建和训练各种深度学习模型。
服务端阅读 02月17日 23:52

TypeORM 的订阅者 (Subscriber) 是什么?如何使用订阅者监听实体事件

订阅者是 TypeORM 中用于监听和响应实体事件的强大机制。它允许开发者在实体生命周期的关键时刻执行自定义逻辑,类似于数据库触发器,但更加灵活和类型安全。订阅者基础概念什么是订阅者订阅者是一个类,它监听特定实体的生命周期事件,并在这些事件发生时执行相应的逻辑。订阅者可以监听的事件包括:beforeInsert: 插入前afterInsert: 插入后beforeUpdate: 更新前afterUpdate: 更新后beforeRemove: 删除前afterRemove: 删除后beforeSoftRemove: 软删除前afterSoftRemove: 软删除后beforeRecover: 恢复前afterRecover: 恢复后订阅者 vs 监听器订阅者: 监听所有实体实例的事件,适合全局逻辑监听器: 在实体内部定义,只监听该实体的事件,适合实体特定的逻辑创建订阅者基本订阅者示例import { EntitySubscriberInterface, EventSubscriber, InsertEvent, UpdateEvent } from 'typeorm';import { User } from '../entity/User';@EventSubscriber()export class UserSubscriber implements EntitySubscriberInterface<User> { // 指定要监听的实体 listenTo() { return User; } // 插入前 beforeInsert(event: InsertEvent<User>) { console.log('Before insert user:', event.entity); // 自动生成用户名 if (!event.entity.username) { event.entity.username = event.entity.email.split('@')[0]; } // 自动设置创建时间 if (!event.entity.createdAt) { event.entity.createdAt = new Date(); } } // 插入后 afterInsert(event: InsertEvent<User>) { console.log('After insert user:', event.entity); // 发送欢迎邮件 this.sendWelcomeEmail(event.entity); // 记录审计日志 this.logAudit('INSERT', event.entity); } // 更新前 beforeUpdate(event: UpdateEvent<User>) { console.log('Before update user:', event.entity); // 自动更新修改时间 if (event.entity) { event.entity.updatedAt = new Date(); } } // 更新后 afterUpdate(event: UpdateEvent<User>) { console.log('After update user:', event.entity); // 记录变更历史 this.recordChangeHistory(event); } // 删除前 beforeRemove(event: any) { console.log('Before remove user:', event.entity); // 检查是否可以删除 if (event.entity.hasActiveOrders()) { throw new Error('Cannot delete user with active orders'); } } // 删除后 afterRemove(event: any) { console.log('After remove user:', event.entity); // 清理关联数据 this.cleanupRelatedData(event.entity.id); } private sendWelcomeEmail(user: User) { // 发送欢迎邮件的逻辑 console.log(`Sending welcome email to ${user.email}`); } private logAudit(action: string, user: User) { // 记录审计日志的逻辑 console.log(`Audit log: ${action} user ${user.id}`); } private recordChangeHistory(event: UpdateEvent<User>) { // 记录变更历史的逻辑 console.log('Recording change history:', event.databaseEntity, event.entity); } private cleanupRelatedData(userId: number) { // 清理关联数据的逻辑 console.log(`Cleaning up data for user ${userId}`); }}注册订阅者在 DataSource 中注册import { DataSource } from 'typeorm';import { UserSubscriber } from './subscriber/UserSubscriber';export const AppDataSource = new DataSource({ type: 'mysql', host: 'localhost', port: 3306, username: 'root', password: 'password', database: 'myapp', entities: [User, Post, Comment], subscribers: [UserSubscriber], // 注册订阅者 synchronize: false, logging: true,});动态注册订阅者import { DataSource } from 'typeorm';const dataSource = new DataSource({ type: 'mysql', host: 'localhost', port: 3306, username: 'root', password: 'password', database: 'myapp', entities: [User, Post, Comment], synchronize: false, logging: true,});// 初始化后动态添加订阅者await dataSource.initialize();const userSubscriber = new UserSubscriber();dataSource.subscribers.push(userSubscriber);高级订阅者用法数据验证@EventSubscriber()export class UserSubscriber implements EntitySubscriberInterface<User> { listenTo() { return User; } beforeInsert(event: InsertEvent<User>) { this.validateUser(event.entity); } beforeUpdate(event: UpdateEvent<User>) { if (event.entity) { this.validateUser(event.entity); } } private validateUser(user: User) { // 验证邮箱格式 const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/; if (!emailRegex.test(user.email)) { throw new Error('Invalid email format'); } // 验证年龄 if (user.age && (user.age < 18 || user.age > 120)) { throw new Error('Age must be between 18 and 120'); } // 验证用户名长度 if (user.username && user.username.length < 3) { throw new Error('Username must be at least 3 characters'); } }}自动填充字段@EventSubscriber()export class BaseEntitySubscriber implements EntitySubscriberInterface { listenTo() { return Object; // 监听所有实体 } beforeInsert(event: InsertEvent<any>) { const entity = event.entity; const now = new Date(); // 自动设置创建时间 if (entity.hasOwnProperty('createdAt') && !entity.createdAt) { entity.createdAt = now; } // 自动设置更新时间 if (entity.hasOwnProperty('updatedAt') && !entity.updatedAt) { entity.updatedAt = now; } // 自动设置创建者 if (entity.hasOwnProperty('createdBy') && !entity.createdBy) { entity.createdBy = this.getCurrentUserId(); } } beforeUpdate(event: UpdateEvent<any>) { const entity = event.entity; if (entity) { // 自动更新更新时间 if (entity.hasOwnProperty('updatedAt')) { entity.updatedAt = new Date(); } // 自动设置更新者 if (entity.hasOwnProperty('updatedBy')) { entity.updatedBy = this.getCurrentUserId(); } } } private getCurrentUserId(): number { // 获取当前用户 ID 的逻辑 return 1; // 示例 }}审计日志@EventSubscriber()export class AuditLogSubscriber implements EntitySubscriberInterface { listenTo() { return Object; // 监听所有实体 } async afterInsert(event: InsertEvent<any>) { await this.createAuditLog('INSERT', event.entity); } async afterUpdate(event: UpdateEvent<any>) { await this.createAuditLog('UPDATE', event.entity, event.databaseEntity); } async afterRemove(event: any) { await this.createAuditLog('DELETE', event.entity); } private async createAuditLog( action: string, entity: any, oldEntity?: any ) { const auditLog = { action, entityType: entity.constructor.name, entityId: entity.id, userId: this.getCurrentUserId(), timestamp: new Date(), changes: oldEntity ? this.getChanges(oldEntity, entity) : null, ipAddress: this.getCurrentIpAddress(), }; // 保存审计日志 console.log('Creating audit log:', auditLog); // await this.auditLogRepository.save(auditLog); } private getChanges(oldEntity: any, newEntity: any): any { const changes: any = {}; for (const key in newEntity) { if (oldEntity[key] !== newEntity[key]) { changes[key] = { old: oldEntity[key], new: newEntity[key], }; } } return changes; } private getCurrentUserId(): number { return 1; // 示例 } private getCurrentIpAddress(): string { return '127.0.0.1'; // 示例 }}缓存失效@EventSubscriber()export class CacheInvalidationSubscriber implements EntitySubscriberInterface { private cacheService: CacheService; constructor() { this.cacheService = new CacheService(); } listenTo() { return Object; // 监听所有实体 } async afterInsert(event: InsertEvent<any>) { await this.invalidateCache(event.entity); } async afterUpdate(event: UpdateEvent<any>) { if (event.entity) { await this.invalidateCache(event.entity); } } async afterRemove(event: any) { await this.invalidateCache(event.entity); } private async invalidateCache(entity: any) { const entityType = entity.constructor.name.toLowerCase(); const entityId = entity.id; // 使单个实体的缓存失效 await this.cacheService.delete(`${entityType}:${entityId}`); // 使列表缓存失效 await this.cacheService.delete(`${entityType}:list:*`); console.log(`Cache invalidated for ${entityType}:${entityId}`); }}通知和事件@EventSubscriber()export class NotificationSubscriber implements EntitySubscriberInterface { private notificationService: NotificationService; constructor() { this.notificationService = new NotificationService(); } listenTo() { return Object; // 监听所有实体 } async afterInsert(event: InsertEvent<any>) { await this.handleInsertEvent(event); } async afterUpdate(event: UpdateEvent<any>) { await this.handleUpdateEvent(event); } private async handleInsertEvent(event: InsertEvent<any>) { const entity = event.entity; // 根据实体类型发送不同的通知 switch (entity.constructor.name) { case 'Order': await this.notificationService.sendOrderCreatedNotification(entity); break; case 'Comment': await this.notificationService.sendCommentNotification(entity); break; case 'Message': await this.notificationService.sendMessageNotification(entity); break; } } private async handleUpdateEvent(event: UpdateEvent<any>) { const entity = event.entity; if (!entity) return; // 根据实体类型和变更发送通知 switch (entity.constructor.name) { case 'Order': if (entity.status !== event.databaseEntity.status) { await this.notificationService.sendOrderStatusChangedNotification(entity); } break; case 'User': if (entity.email !== event.databaseEntity.email) { await this.notificationService.sendEmailChangedNotification(entity); } break; } }}订阅者最佳实践1. 单一职责原则每个订阅者应该只负责一个特定的功能领域。// ✅ 好的做法:每个订阅者负责一个功能@EventSubscriber()export class UserValidationSubscriber implements EntitySubscriberInterface<User> { listenTo() { return User; } beforeInsert(event: InsertEvent<User>) { /* 验证逻辑 */ }}@EventSubscriber()export class UserAuditSubscriber implements EntitySubscriberInterface<User> { listenTo() { return User; } afterInsert(event: InsertEvent<User>) { /* 审计逻辑 */ }}// ❌ 不好的做法:一个订阅者负责多个功能@EventSubscriber()export class UserSubscriber implements EntitySubscriberInterface<User> { listenTo() { return User; } beforeInsert(event: InsertEvent<User>) { /* 验证逻辑 */ } afterInsert(event: InsertEvent<User>) { /* 审计逻辑 */ } afterUpdate(event: UpdateEvent<User>) { /* 通知逻辑 */ }}2. 避免循环依赖订阅者不应该触发会导致其他订阅者无限循环的操作。@EventSubscriber()export class UserSubscriber implements EntitySubscriberInterface<User> { constructor( private userRepository: Repository<User> ) {} listenTo() { return User; } async afterInsert(event: InsertEvent<User>) { // ❌ 不好的做法:可能导致循环 // await this.userRepository.save(event.entity); // ✅ 好的做法:使用 EntityManager 避免触发订阅者 await this.userRepository.manager.save(User, event.entity); }}3. 错误处理在订阅者中妥善处理错误,避免影响主操作。@EventSubscriber()export class SafeSubscriber implements EntitySubscriberInterface<User> { listenTo() { return User; } async afterInsert(event: InsertEvent<User>) { try { await this.sendNotification(event.entity); } catch (error) { // 记录错误但不影响主操作 console.error('Failed to send notification:', error); // 可以将错误发送到错误监控系统 } } private async sendNotification(user: User) { // 发送通知的逻辑 }}4. 性能考虑避免在订阅者中执行耗时操作。@EventSubscriber()export class PerformanceAwareSubscriber implements EntitySubscriberInterface<User> { listenTo() { return User; } async afterInsert(event: InsertEvent<User>) { // ❌ 不好的做法:同步执行耗时操作 // await this.sendEmail(event.entity); // await this.generateReport(event.entity); // ✅ 好的做法:异步执行耗时操作 setImmediate(() => { this.sendEmail(event.entity).catch(console.error); this.generateReport(event.entity).catch(console.error); }); // 或者使用消息队列 // await this.queueService.add('send-email', { userId: event.entity.id }); } private async sendEmail(user: User) { // 发送邮件的逻辑 } private async generateReport(user: User) { // 生成报告的逻辑 }}5. 测试订阅者为订阅者编写单元测试。import { InsertEvent } from 'typeorm';import { UserSubscriber } from './UserSubscriber';import { User } from '../entity/User';describe('UserSubscriber', () => { let subscriber: UserSubscriber; beforeEach(() => { subscriber = new UserSubscriber(); }); it('should auto-generate username before insert', () => { const user = new User(); user.email = 'test@example.com'; const event: InsertEvent<User> = { entity: user, metadata: {} as any, queryRunner: {} as any, manager: {} as any, }; subscriber.beforeInsert(event); expect(user.username).toBe('test'); }); it('should set createdAt before insert', () => { const user = new User(); user.email = 'test@example.com'; const event: InsertEvent<User> = { entity: user, metadata: {} as any, queryRunner: {} as any, manager: {} as any, }; subscriber.beforeInsert(event); expect(user.createdAt).toBeInstanceOf(Date); });});订阅者 vs 监听器监听器示例@Entity()export class User { @PrimaryGeneratedColumn() id: number; @Column() name: string; @Column() email: string; @CreateDateColumn() createdAt: Date; @UpdateDateColumn() updatedAt: Date; @BeforeInsert() beforeInsert() { console.log('Before insert in entity'); this.createdAt = new Date(); } @AfterInsert() afterInsert() { console.log('After insert in entity'); } @BeforeUpdate() beforeUpdate() { console.log('Before update in entity'); this.updatedAt = new Date(); } @AfterUpdate() afterUpdate() { console.log('After update in entity'); }}选择订阅者还是监听器使用订阅者的情况:需要监听多个实体的事件需要注入其他服务需要实现跨实体的业务逻辑需要保持实体类的简洁使用监听器的情况:逻辑只与单个实体相关逻辑简单,不需要外部依赖希望逻辑与实体紧密关联TypeORM 的订阅者机制提供了强大而灵活的事件处理能力,合理使用订阅者可以实现复杂的业务逻辑,同时保持代码的整洁和可维护性。
服务端阅读 02月17日 23:49

TypeORM 的迁移系统如何工作?如何创建、运行和管理数据库迁移

迁移系统是 TypeORM 中用于管理数据库结构变更的重要工具,它允许开发者以版本化的方式追踪和应用数据库结构的变化,确保团队协作时数据库结构的一致性。迁移基础概念什么是迁移迁移是数据库结构变更的脚本,用于:创建或删除表添加或删除列修改列类型创建或删除索引添加或删除外键约束每个迁移都有唯一的版本号和时间戳,确保迁移可以按顺序执行。迁移文件结构import { MigrationInterface, QueryRunner, Table } from 'typeorm';export class CreateUserTable1234567890123 implements MigrationInterface { public async up(queryRunner: QueryRunner): Promise<void> { // 执行迁移:创建表、添加列等 await queryRunner.createTable( new Table({ name: 'user', columns: [ { name: 'id', type: 'int', isPrimary: true, isGenerated: true, generationStrategy: 'increment', }, { name: 'name', type: 'varchar', }, { name: 'email', type: 'varchar', isUnique: true, }, { name: 'createdAt', type: 'timestamp', default: 'CURRENT_TIMESTAMP', }, ], }), true ); } public async down(queryRunner: QueryRunner): Promise<void> { // 回滚迁移:删除表、移除列等 await queryRunner.dropTable('user'); }}创建迁移使用 CLI 创建迁移# 创建新迁移npm run typeorm migration:generate -- -n CreateUserTable# 或者使用 npxnpx typeorm migration:generate -n CreateUserTable# 创建空迁移npm run typeorm migration:create -- -n CreateUserTable配置 DataSourceimport { DataSource } from 'typeorm';export const AppDataSource = new DataSource({ type: 'mysql', host: 'localhost', port: 3306, username: 'root', password: 'password', database: 'myapp', entities: ['src/entity/**/*.ts'], migrations: ['src/migration/**/*.ts'], subscribers: ['src/subscriber/**/*.ts'], synchronize: false, // 生产环境必须设为 false logging: true,});运行迁移使用 CLI 运行迁移# 运行所有待执行的迁移npm run typeorm migration:run# 回滚最后一次迁移npm run typeorm migration:revert# 显示迁移状态npm run typeorm migration:show# 清空数据库(慎用)npm run typeorm schema:drop在代码中运行迁移import { DataSource } from 'typeorm';async function runMigrations() { const dataSource = new DataSource({ type: 'mysql', host: 'localhost', port: 3306, username: 'root', password: 'password', database: 'myapp', entities: ['src/entity/**/*.ts'], migrations: ['src/migration/**/*.ts'], }); await dataSource.initialize(); // 运行所有待执行的迁移 await dataSource.runMigrations(); // 回滚最后一次迁移 // await dataSource.undoLastMigration(); await dataSource.destroy();}runMigrations().catch(console.error);迁移操作示例创建表public async up(queryRunner: QueryRunner): Promise<void> { await queryRunner.createTable( new Table({ name: 'user', columns: [ { name: 'id', type: 'int', isPrimary: true, isGenerated: true, generationStrategy: 'increment', }, { name: 'name', type: 'varchar', length: '100', }, { name: 'email', type: 'varchar', length: '255', isUnique: true, }, { name: 'age', type: 'int', nullable: true, }, { name: 'isActive', type: 'boolean', default: true, }, { name: 'createdAt', type: 'timestamp', default: 'CURRENT_TIMESTAMP', }, { name: 'updatedAt', type: 'timestamp', default: 'CURRENT_TIMESTAMP', onUpdate: 'CURRENT_TIMESTAMP', }, ], indices: [ { name: 'IDX_USER_EMAIL', columnNames: ['email'], }, ], }), true );}添加列public async up(queryRunner: QueryRunner): Promise<void> { await queryRunner.addColumn( 'user', new TableColumn({ name: 'avatar', type: 'varchar', length: '255', isNullable: true, }) );}public async down(queryRunner: QueryRunner): Promise<void> { await queryRunner.dropColumn('user', 'avatar');}修改列public async up(queryRunner: QueryRunner): Promise<void> { await queryRunner.changeColumn( 'user', 'name', new TableColumn({ name: 'name', type: 'varchar', length: '200', // 修改长度 }) );}public async down(queryRunner: QueryRunner): Promise<void> { await queryRunner.changeColumn( 'user', 'name', new TableColumn({ name: 'name', type: 'varchar', length: '100', }) );}创建索引public async up(queryRunner: QueryRunner): Promise<void> { await queryRunner.createIndex( 'user', new TableIndex({ name: 'IDX_USER_EMAIL', columnNames: ['email'], isUnique: true, }) );}public async down(queryRunner: QueryRunner): Promise<void> { await queryRunner.dropIndex('user', 'IDX_USER_EMAIL');}添加外键public async up(queryRunner: QueryRunner): Promise<void> { await queryRunner.createForeignKey( 'post', new TableForeignKey({ columnNames: ['authorId'], referencedColumnNames: ['id'], referencedTableName: 'user', onDelete: 'CASCADE', }) );}public async down(queryRunner: QueryRunner): Promise<void> { const table = await queryRunner.getTable('post'); const foreignKey = table.foreignKeys.find( fk => fk.columnNames.indexOf('authorId') !== -1 ); await queryRunner.dropForeignKey('post', foreignKey);}执行原生 SQLpublic async up(queryRunner: QueryRunner): Promise<void> { await queryRunner.query(` CREATE TRIGGER update_user_timestamp BEFORE UPDATE ON user FOR EACH ROW SET NEW.updatedAt = CURRENT_TIMESTAMP `);}public async down(queryRunner: QueryRunner): Promise<void> { await queryRunner.query(`DROP TRIGGER update_user_timestamp`);}数据迁移迁移现有数据public async up(queryRunner: QueryRunner): Promise<void> { // 添加新列 await queryRunner.addColumn( 'user', new TableColumn({ name: 'fullName', type: 'varchar', length: '200', isNullable: true, }) ); // 迁移数据 await queryRunner.query(` UPDATE user SET fullName = CONCAT(firstName, ' ', lastName) `); // 删除旧列 await queryRunner.dropColumn('user', 'firstName'); await queryRunner.dropColumn('user', 'lastName');}批量插入数据public async up(queryRunner: QueryRunner): Promise<void> { const users = [ { name: 'John', email: 'john@example.com' }, { name: 'Jane', email: 'jane@example.com' }, ]; for (const user of users) { await queryRunner.query( `INSERT INTO user (name, email) VALUES (?, ?)`, [user.name, user.email] ); }}迁移最佳实践1. 版本控制// 迁移文件命名格式: {timestamp}-{name}.ts// 例如: 1234567890123-CreateUserTable.tsexport class CreateUserTable1234567890123 implements MigrationInterface { // 迁移内容}2. 可逆性确保每个迁移都可以完全回滚:export class AddUserAvatar1234567890123 implements MigrationInterface { public async up(queryRunner: QueryRunner): Promise<void> { await queryRunner.addColumn('user', new TableColumn({ name: 'avatar', type: 'varchar', isNullable: true, })); } public async down(queryRunner: QueryRunner): Promise<void> { await queryRunner.dropColumn('user', 'avatar'); }}3. 事务支持public async up(queryRunner: QueryRunner): Promise<void> { await queryRunner.startTransaction(); try { await queryRunner.createTable(/* ... */); await queryRunner.addColumn(/* ... */); await queryRunner.commitTransaction(); } catch (err) { await queryRunner.rollbackTransaction(); throw err; }}4. 环境区分export class AddProductionIndex1234567890123 implements MigrationInterface { public async up(queryRunner: QueryRunner): Promise<void> { // 只在生产环境执行 if (process.env.NODE_ENV === 'production') { await queryRunner.createIndex(/* ... */); } } public async down(queryRunner: QueryRunner): Promise<void> { if (process.env.NODE_ENV === 'production') { await queryRunner.dropIndex(/* ... */); } }}常见问题1. 迁移冲突当多个开发者同时创建迁移时可能出现冲突:# 解决方法:重新生成迁移npm run typeorm migration:generate -- -n FixMigrationConflict2. 数据丢失风险在修改列类型或删除列前备份数据:public async up(queryRunner: QueryRunner): Promise<void> { // 备份数据 await queryRunner.query(`CREATE TABLE user_backup AS SELECT * FROM user`); // 执行迁移 await queryRunner.changeColumn(/* ... */);}3. 性能问题对于大数据表的迁移,考虑分批处理:public async up(queryRunner: QueryRunner): Promise<void> { const batchSize = 1000; let offset = 0; while (true) { const users = await queryRunner.query( `SELECT id FROM user LIMIT ${batchSize} OFFSET ${offset}` ); if (users.length === 0) break; for (const user of users) { await queryRunner.query(`UPDATE user SET ... WHERE id = ?`, [user.id]); } offset += batchSize; }}生产环境建议禁用 synchronize: 生产环境必须设置 synchronize: false备份策略: 执行迁移前备份数据库测试环境: 先在测试环境验证迁移回滚计划: 准备好回滚方案监控日志: 监控迁移执行日志分步执行: 对于大型迁移,分步骤执行TypeORM 的迁移系统提供了强大而灵活的数据库结构管理能力,掌握迁移系统的使用对于维护大型应用的数据库结构至关重要。