标签

Python

Python 是一种动态类型、多用途的编程语言。它旨在快速学习、理解和使用,并强制执行干净且统一的语法。

Python
服务端6月4日 15:48
Python上下文管理器:__exit__异常处理、@contextmanager和ExitStack`with open(...) as f:` 这行代码用了十几年,但很多人不知道背后的机制——上下文管理器。更关键的是,自己写一个靠谱的上下文管理器并不简单:`__exit__` 里该不该吃掉异常?`@contextmanager` 的 yield 和 finally 怎么配合?多个资源怎么一起管理?这篇文章把这些问题都讲清楚。 ## 上下文管理器解决什么问题 资源管理的核心要求:**用完必须释放,不管有没有异常**。不用上下文管理器就得写 try/finally: ```python # 笨办法 f = open('data.txt') try: data = f.read() finally: f.close() # with 语句:等价但简洁 with open('data.txt') as f: data = f.read() # f.close() 自动调用,即使 read() 抛异常 ``` `with` 语句保证 `__exit__` 一定被调用,省掉 finally 的样板代码。文件、数据库连接、锁、事务,都需要这种保证。 ## __enter__ 和 __exit__ 协议 上下文管理器是实现 `__enter__` 和 `__exit__` 的对象: ```python class Timer: def __enter__(self): import time self.start = time.perf_counter() return self # as 绑定的对象 def __exit__(self, exc_type, exc_val, exc_tb): import time elapsed = time.perf_counter() - self.start print(f"耗时: {elapsed:.4f}s") return False # 不吞异常,继续传播 with Timer() as t: time.sleep(1) # 耗时: 1.0012s ``` `__enter__` 返回值通过 `as` 绑定。`__exit__` 的三个参数是异常信息(没有异常时都是 None)。返回 `True` 表示吞掉异常,返回 `False`(或不返回)让异常继续传播。 ### __exit__ 该不该吞异常 大多数场景**不应该吞**——返回 False,让调用方处理异常。只有极少数场景需要吞:如 `suppress(FileNotFoundError)` 这种明确要忽略特定异常的。 ```python # 危险:吞掉所有异常,问题难以排查 def __exit__(self, exc_type, exc_val, exc_tb): return True # ❌ 任何异常都被静默忽略 # 正确:只吞特定异常 def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is FileNotFoundError: return True # 忽略文件不存在 return False # 其他异常继续传播 ``` ## @contextmanager:更简单的写法 手写 `__enter__` 和 `__exit__` 容易出错——`@contextmanager` 装饰器把上下文管理器变成生成器函数,逻辑更清晰: ```python from contextlib import contextmanager @contextmanager def timer(name="block"): import time start = time.perf_counter() try: yield # yield 处就是 with 块的代码 finally: elapsed = time.perf_counter() - start print(f"{name} 耗时: {elapsed:.4f}s") with timer("数据处理"): process_data() ``` `yield` 之前是 `__enter__`,`yield` 之后是 `__exit__`。`finally` 块保证清理逻辑一定执行。 ### yield 可以返回值 ```python @contextmanager def temp_directory(): import tempfile, shutil dirpath = tempfile.mkdtemp() try: yield dirpath # 返回临时目录路径 finally: shutil.rmtree(dirpath) # 用完删除 with temp_directory() as tmpdir: print(f"临时目录: {tmpdir}") # 在 tmpdir 里写文件... # tmpdir 已被删除 ``` ### 异常处理 `@contextmanager` 里 `yield` 抛出的异常会传播到 `with` 块,但如果在 yield 外面捕获了,就相当于吞掉: ```python @contextmanager def safe_operation(): try: yield except ValueError as e: print(f"捕获到 ValueError: {e}") # 不 re-raise,异常被吞掉 finally: print("清理完成") with safe_operation(): raise ValueError("出错了") # 输出:捕获到 ValueError: 出错了 # 清理完成 # 程序继续执行,不会崩溃 ``` 如果想在 `@contextmanager` 里**记录但不吞掉**异常,用 `raise` 重新抛出,或者不捕获让 finally 执行后自然传播。 ## contextlib 实用工具 ### suppress:忽略指定异常 替代 `try/except` + `pass` 的惯用法: ```python from contextlib import suppress # 以前 try: os.remove('temp.txt') except FileNotFoundError: pass # 现在 with suppress(FileNotFoundError): os.remove('temp.txt') ``` 可以忽略多种异常:`suppress(FileNotFoundError, PermissionError)`。 ### closing:给有 close() 方法的对象加 with 支持 ```python from contextlib import closing from urllib.request import urlopen with closing(urlopen('https://example.com')) as response: data = response.read() # response.close() 自动调用 ``` ### redirect_stdout/redirect_stderr:临时重定向输出 ```python from contextlib import redirect_stdout import io output = io.StringIO() with redirect_stdout(output): print("这行不会显示在终端") captured = output.getvalue() # "这行不会显示在终端 " ``` 适合测试里捕获 print 输出,或者把进度信息写到日志文件而不是终端。 ### ExitStack:动态管理多个上下文 不确定需要打开多少个资源时用 ExitStack: ```python from contextlib import ExitStack files = ['a.txt', 'b.txt', 'c.txt'] with ExitStack() as stack: handles = [stack.enter_context(open(f)) for f in files] # 三个文件都打开了,任何一个打开失败,之前打开的会自动关闭 for h in handles: process(h) # 三个文件全部自动关闭 ``` 也可以用 `callback` 注册清理函数: ```python with ExitStack() as stack: stack.callback(print, "清理完成") do_something() # 无论 do_something 是否抛异常,"清理完成" 都会打印 ``` ## 异步上下文管理器 Python 3.5+ 支持 `async with`,对应 `__aenter__` 和 `__aexit__`: ```python class AsyncDBConnection: async def __aenter__(self): self.conn = await create_connection() return self.conn async def __aexit__(self, exc_type, exc_val, exc_tb): await self.conn.close() return False async def main(): async with AsyncDBConnection() as conn: await conn.execute("SELECT 1") ``` `@asynccontextmanager` 是异步版本: ```python from contextlib import asynccontextmanager @asynccontextmanager async def db_transaction(pool): conn = await pool.acquire() try: yield conn finally: await pool.release(conn) async def main(): async with db_transaction(pool) as conn: await conn.execute("INSERT ...") ``` FastAPI 和 SQLAlchemy 的异步数据库会话就是用这个模式。 ## 实际应用场景 ### 数据库事务 ```python @contextmanager def transaction(conn): try: yield conn conn.commit() except Exception: conn.rollback() raise with transaction(conn): conn.execute("INSERT ...") conn.execute("UPDATE ...") # 两条语句要么都成功,要么都回滚 ``` ### 临时修改环境变量 ```python @contextmanager def env_var(key, value): import os old = os.environ.get(key) os.environ[key] = value try: yield finally: if old is None: os.environ.pop(key, None) else: os.environ[key] = old with env_var('DATABASE_URL', 'sqlite:///:memory:'): run_tests() # 测试时用内存数据库 # DATABASE_URL 恢复原值 ``` ### 线程锁 `threading.Lock` 本身就是上下文管理器: ```python import threading lock = threading.Lock() with lock: # 临界区,自动加锁/释放锁 update_shared_data() ``` 比 `lock.acquire() ... lock.release()` 安全——不会因为异常导致锁不释放。
服务端6月2日 01:38
Python 装饰器是怎么工作的?@ 语法糖、执行时机和 wraps 详解装饰器的本质是一个接收函数作为参数并返回新函数的高阶函数。`@decorator` 语法糖等价于 `func = decorator(func)`。理解装饰器的关键:它只是函数替换,在定义时执行,不是调用时。 ## 装饰器做了什么 ```python def log(func): def wrapper(*args, **kwargs): print(f"Calling {func.__name__}") return func(*args, **kwargs) return wrapper @log def hello(name): print(f"Hello, {name}") # 等价于 hello = log(hello) ``` `@log` 发生在函数定义时,不是调用时。Python 解释器看到 `@log` 后,把 `hello` 传给 `log()`,返回的 `wrapper` 替换掉原来的 `hello`。之后所有对 `hello()` 的调用实际上调的是 `wrapper()`。 ## 执行时机 ```python def log(func): print("装饰器执行了") # 模块加载时就执行 def wrapper(*args, **kwargs): print("wrapper 执行了") # 每次调用函数时执行 return func(*args, **kwargs) return wrapper @log def hello(): print("hello") # 输出: 装饰器执行了(定义时立即执行) hello() # 输出: wrapper 执行了 / hello ``` 装饰器外层的代码在模块加载时执行(一次),wrapper 内的代码在每次函数调用时执行。 ## 多个装饰器的叠加 ```python @a @b def f(): pass # 等价于 f = a(b(f)) ``` 从下到上装饰。调用 `f()` 时,执行顺序是 a -> b -> f -> b 的后处理 -> a 的后处理。 ## 带参数的装饰器 需要三层嵌套——最外层接收装饰器参数,中间层接收被装饰函数,最内层是 wrapper: ```python def retry(max_attempts=3): def decorator(func): def wrapper(*args, **kwargs): for i in range(max_attempts): try: return func(*args, **kwargs) except Exception: if i == max_attempts - 1: raise return wrapper return decorator @retry(max_attempts=5) def call_api(): ... # 等价于 call_api = retry(max_attempts=5)(call_api) ``` `@retry(5)` 先调用 `retry(5)` 返回 `decorator`,再 `decorator(call_api)` 返回 `wrapper`。三层嵌套是固定模式。 ## functools.wraps 装饰器替换了原函数,导致 `__name__`、`__doc__` 等元信息丢失。`@wraps(func)` 复制原函数的元信息到 wrapper: ```python from functools import wraps def log(func): @wraps(func) def wrapper(*args, **kwargs): return func(*args, **kwargs) return wrapper @log def hello(): pass print(hello.__name__) # hello(没有 @wraps 会是 wrapper) ``` ## 装饰器 vs 闭包 装饰器是基于闭包的。`wrapper` 闭包了 `func` 变量——每次调用 `wrapper` 时都能访问到被装饰的原始函数。装饰器就是"创建闭包的工厂函数"。 ## 追问 ### 装饰器能装饰类吗? 能。装饰器接收的参数不一定是函数,也可以是类: ```python def add_repr(cls): def __repr__(self): return f"{cls.__name__}({self.__dict__})" cls.__repr__ = __repr__ return cls @add_repr class User: def __init__(self, name): self.name = name ``` dataclass 的 `@dataclass` 就是类装饰器。 ### 装饰器有性能开销吗? 有一层函数调用的开销,通常可以忽略。但在极高性能场景(百万次/秒调用),装饰器的额外调用栈可能成为瓶颈。可以用 `functools.lru_cache` 缓存结果避免重复执行。
服务端6月2日 01:37
Python GIL 是什么?为什么多线程不能利用多核?怎么绕过?GIL(Global Interpreter Lock)是 CPython 的一把全局互斥锁,同一时刻只允许一个线程执行 Python 字节码。这意味着 Python 多线程无法利用多核 CPU 做计算密集型任务。但 IO 密集型任务不受影响——线程等待 IO 时会释放 GIL。 ## GIL 为什么存在 CPython 的内存管理(引用计数)不是线程安全的。如果多个线程同时修改 `ob_refcnt`,可能导致内存泄漏或提前释放。GIL 是最简单的解决方案——一个线程执行 Python 代码时,其他线程不能运行。 为什么不去掉?Python 核心开发者试过多次,去掉 GIL 会导致单线程性能下降 10-30%(因为要加细粒度锁替代全局锁),C 扩展也需要大量改写。社区不愿意接受这个代价。 Python 3.13 引入了实验性的 free-threaded 模式(PEP 703),允许禁用 GIL,但目前还是实验阶段,性能和兼容性都不成熟。 ## GIL 的影响范围 | 任务类型 | 多线程表现 | 原因 | |----------|-----------|------| | CPU 密集 | 比单线程还慢 | 线程争抢 GIL,切换开销大 | | IO 密集 | 有效加速 | 等 IO 时释放 GIL,其他线程可运行 | | 混合型 | 部分有效 | 计算部分受 GIL 限制 | CPU 密集比单线程还慢是因为 GIL 切换本身有开销,多线程反而增加了竞争。 ## 绕过 GIL 的三种方式 **1. 多进程(最常用)** ```python from multiprocessing import Pool def heavy_compute(n): return sum(i * i for i in range(n)) with Pool(4) as p: results = p.map(heavy_compute, [10**7] * 4) # 4 个进程并行 ``` 每个进程有独立的 GIL,真正并行。缺点:进程间通信成本高(数据要序列化),启动慢。 **2. C 扩展释放 GIL** 在 C 扩展中做计算密集型操作时,可以手动释放 GIL: ```c Py_BEGIN_ALLOW_THREADS // 这里做纯 C 计算,不操作 Python 对象 result = heavy_computation(data); Py_END_ALLOW_THREADS ``` NumPy、Pandas、hashlib 等库在底层 C 代码中释放了 GIL,所以它们在多线程中可以并行运行。 **3. 用 asyncio 替代多线程** IO 密集型任务不需要多线程,用协程就够了: ```python import asyncio import aiohttp async def fetch(url): async with aiohttp.ClientSession() as session: async with session.get(url) as resp: return await resp.text() async def main(): tasks = [fetch(url) for url in urls] results = await asyncio.gather(*tasks) ``` 单线程 + 事件循环,没有 GIL 问题,也没有线程切换开销。 ## GIL 的释放时机 CPython 在两种情况下释放 GIL: 1. **IO 操作**:网络请求、文件读写、time.sleep() 等 2. **C 扩展主动释放**:NumPy 运算、hashlib 哈希等 纯 Python 代码(循环、计算、字符串操作)永远不会释放 GIL。 ## 追问 ### Python 3.13 的 no-GIL 模式能用吗? 实验阶段,不推荐生产使用。性能比有 GIL 模式慢约 10-15%,很多 C 扩展还不兼容。等 Python 3.14/3.15 稳定后再考虑。 ### 多线程在 Python 里完全没用吗? 不是。IO 密集型场景(网络爬虫、API 调用、数据库查询)多线程比单线程快得多。只是 CPU 密集型场景多线程没有加速效果。
服务端6月2日 01:36
Python 迭代器和生成器有什么区别?yield 和迭代器协议详解迭代器是实现了 `__iter__` 和 `__next__` 方法的对象,生成器是用 `yield` 关键字自动创建的迭代器。生成器是迭代器的子集——所有生成器都是迭代器,但迭代器不一定是生成器。核心区别:生成器更简洁,且天然支持惰性求值。 ## 迭代器协议 迭代器必须实现两个方法: - `__iter__`:返回 self(让迭代器本身也可迭代) - `__next__`:返回下一个值,没有值时抛出 StopIteration ```python class Countdown: def __init__(self, start): self.current = start def __iter__(self): return self def __next__(self): if self.current <= 0: raise StopIteration self.current -= 1 return self.current + 1 for n in Countdown(3): print(n) # 3, 2, 1 ``` 手动实现迭代器要写 `__iter__`、`__next__`、维护状态、处理 StopIteration。代码量多,容易写错。 ## 生成器:迭代器的语法糖 生成器用 `yield` 关键字,Python 自动实现迭代器协议: ```python def countdown(start): current = start while current > 0: yield current current -= 1 for n in countdown(3): print(n) # 3, 2, 1 ``` 调用 `countdown(3)` 不会执行函数体,而是返回一个生成器对象。每次 `next()` 执行到 `yield` 暂停并返回值,下次 `next()` 从暂停处继续。 3 行代码 vs 10 行代码,效果完全一样。这就是为什么 Python 社区几乎总是用生成器而不是手写迭代器。 ## 生成器表达式 类似列表推导式,但用圆括号,惰性求值: ```python # 列表推导式 — 一次性生成所有数据 squares = [x**2 for x in range(1000000)] # 占用大量内存 # 生成器表达式 — 按需生成 squares = (x**2 for x in range(1000000)) # 几乎不占内存 # 在 sum/max/min 等函数里直接用 total = sum(x**2 for x in range(1000000)) ``` 处理大数据时,生成器表达式是列表推导式的直接替代。 ## yield from:委托给子生成器 `yield from` 把迭代委托给另一个生成器,避免手写 for 循环: ```python def flatten(nested): for item in nested: if isinstance(item, list): yield from flatten(item) # 递归展开 else: yield item list(flatten([1, [2, [3, 4]], 5])) # [1, 2, 3, 4, 5] ``` `yield from` 不只是语法糖——它还正确处理了 `send()`、`throw()`、`close()` 等生成器方法的传递。 ## 生成器做协程 `yield` 不只能返回值,还能接收值(通过 `send()`),这让生成器可以用来实现协程: ```python def accumulator(): total = 0 while True: value = yield total if value is None: break total += value gen = accumulator() next(gen) # 启动生成器,返回 0 gen.send(10) # 返回 10 gen.send(20) # 返回 30 ``` Python 3.5+ 推荐用 `async/await` 替代这种用法,但理解 `yield` 的双向通信有助于理解协程原理。 ## 追问 ### 迭代器只能遍历一次吗? 是的。迭代器是有状态的,遍历完就空了。要重新遍历,需要创建新的迭代器。可迭代对象(如列表)每次调用 `iter()` 都返回新的迭代器。 ### 生成器的内存优势有多大? 读 10GB 日志文件,`for line in open("log.txt")` 只占几 KB 内存(每次读一行)。`readlines()` 会把整个文件加载到内存。差距在数据量大时非常显著。
服务端6月2日 01:36
Python 内存管理是怎样的?引用计数、分代 GC 和内存池原理Python 内存管理分三层:引用计数(主要)、垃圾回收(处理循环引用)、内存池(减少 malloc 开销)。日常开发不需要手动管理内存,但理解机制能帮你排查内存泄漏。 ## 引用计数:核心机制 每个对象维护一个引用计数 `ob_refcnt`。引用增加时 +1,引用减少时 -1,归零时立即释放。 ```python import sys a = [1, 2, 3] # 引用计数 1 b = a # 引用计数 2 c = a # 引用计数 3 print(sys.getrefcount(a)) # 4(多 1 是因为 getrefcount 参数本身也是引用) del b # 引用计数 2 c = None # 引用计数 1 # a 离开作用域后引用计数归零,内存释放 ``` 引用计数的优势:实时释放,不需要暂停程序做垃圾回收。劣势:无法处理循环引用。 ## 循环引用问题 ```python a = [] b = [] a.append(b) # a 引用 b b.append(a) # b 引用 a del a, b # 引用计数各剩 1(互相引用),永远不会归零 ``` 引用计数对循环引用无能为力。Python 用分代垃圾回收(GC)处理这种情况。 ## 分代垃圾回收 GC 把对象分成三代:第 0 代(新对象)、第 1 代、第 2 代(长寿对象)。 - 新创建的对象在第 0 代 - 经过一次 GC 存活的对象晋升到下一代 - 第 0 代 GC 最频繁(阈值约 700 个对象),第 2 代最少 分代回收的理论依据:大部分对象很快变成垃圾(如函数内的临时变量),长寿对象倾向于一直活着。只频繁检查年轻对象,减少 GC 开销。 ```python import gc print(gc.get_threshold()) # (700, 10, 10) — 第0代阈值700,每10次第0代GC触发1次第1代 print(gc.get_count()) # 当前各代对象计数 ``` 手动触发 GC:`gc.collect()`。通常不需要手动调用,但在处理大量循环引用对象后可以主动回收。 ## 内存池:pymalloc Python 不直接用系统的 malloc/free 管理小对象,而是用自己实现的 pymalloc 内存池: - **小对象(<512 字节)**:由 pymalloc 从大块内存中分配,减少系统调用 - **大对象(>=512 字节)**:直接用系统 malloc 内存池按 256 KB 的 arena 分块,arena 内按 4 KB 的 pool 分块,pool 内按固定大小的 block 分配。相同大小的 block 共享 pool,减少碎片。 这就是为什么 Python 进程的 RSS(常驻内存)不会随着对象释放而下降——pymalloc 保留空闲 arena 供后续使用,不归还给操作系统。 ## 内存泄漏排查 Python 的内存泄漏通常是"对象被意外引用导致无法释放"而非真正的泄漏。 **1. 用 objgraph 找到引用链** ```python import objgraph objgraph.show_backrefs(objgraph.by_type("dict")[0], max_depth=5) ``` 生成引用关系图,找到谁在持有不该持有的引用。 **2. 用 tracemalloc 定位分配位置** ```python import tracemalloc tracemalloc.start() # ... 运行代码 ... snapshot = tracemalloc.take_snapshot() top_stats = snapshot.statistics("lineno") for stat in top_stats[:10]: print(stat) ``` 显示内存分配最多的代码行。 **3. 常见泄漏原因** - 全局列表/字典不断追加但从不清理 - 闭包或回调持有大对象引用 - `__del__` 方法导致对象无法被 GC 回收(Python 3.4 已改善) - 缓存没有上限(用 `functools.lru_cache` 替代手动缓存) ## 追问 ### 为什么 Python 进程的内存只增不减? pymalloc 保留空闲 arena 不归还操作系统。如果你看到 RSS 很高但对象不多,可能是内存碎片。`malloc_trim(0)` 可以强制归还(Linux),但不保证生效。 ### 引用计数和 GC 哪个先执行? 引用计数是实时的,每次赋值/删除都更新。GC 是周期性运行的,只处理循环引用。两者配合工作。
服务端6月2日 01:35
Python 异常处理怎么写?try/except/else/finally 和自定义异常详解Python 用 try/except 捕获异常,else 放无异常时执行的代码,finally 放无论如何都执行的清理逻辑。自定义异常继承 Exception,让错误类型可区分。 ## 基本结构 ```python try: result = 10 / 0 except ZeroDivisionError: print("除零错误") except (TypeError, ValueError) as e: print(f"类型或值错误: {e}") else: print("没有异常时执行") finally: print("无论如何都执行") ``` - `else` 在 try 块没有抛异常时执行,适合放"成功后才做的事" - `finally` 总是执行,即使 try 里有 return 或 except 里有 raise。用于释放资源(关闭文件、断开连接) ## 异常继承体系 所有异常继承自 `BaseException`,日常使用继承 `Exception`: ``` BaseException +-- SystemExit # sys.exit() +-- KeyboardInterrupt # Ctrl+C +-- Exception # 所有业务异常的基类 +-- ValueError +-- TypeError +-- KeyError +-- FileNotFoundError ``` 不要捕获 `BaseException`——`except:` 或 `except BaseException:` 会把 `KeyboardInterrupt` 和 `SystemExit` 也吞掉,程序无法正常退出。永远用 `except Exception:` 或更具体的异常类型。 ## 自定义异常 ```python class BusinessError(Exception): """业务逻辑错误基类""" pass class InsufficientBalance(BusinessError): def __init__(self, balance, amount): self.balance = balance self.amount = amount super().__init__(f"余额不足: 余额 {balance}, 需要 {amount}") try: withdraw(account, amount) except InsufficientBalance as e: print(e) print(e.balance) # 100 ``` 自定义异常比直接 `raise ValueError("余额不足")` 好在:调用方可以按类型捕获(`except InsufficientBalance`),而不是解析错误消息字符串。 ## 异常链:raise from 捕获一个异常后抛出另一个,保留原始异常的堆栈: ```python try: data = json.loads(raw) except json.JSONDecodeError as e: raise DataFormatError("数据格式错误") from e ``` `from e` 让新异常的 `__cause__` 指向原始异常。调试时能看到完整的异常链。不加 `from e`,原始异常的堆栈信息会丢失。 ## 常见反模式 **1. 空 except 吞掉所有错误** ```python try: do_something() except: # 错误!吞掉 KeyboardInterrupt pass # 正确做法 except Exception as e: log.error(f"操作失败: {e}") ``` **2. except 块太宽** ```python try: value = data["key"] result = int(value) except Exception: # 太宽,KeyError 和 ValueError 混在一起 ... # 正确:分别处理 except KeyError: result = 0 except ValueError: result = -1 ``` **3. 用异常做流程控制** ```python # 错误:用异常判断 key 是否存在 try: value = d[key] except KeyError: value = default # 正确:用 get 或 in value = d.get(key, default) ``` 异常比条件判断慢 10-100 倍。只对真正的"异常"情况用异常,正常流程用条件判断。 ## 追问 ### finally 里的 return 会覆盖 try 里的 return 吗? 会。永远不要在 finally 里写 return。 ```python def foo(): try: return 1 finally: return 2 # 覆盖 try 里的 print(foo()) # 2 ```
服务端6月2日 01:33
Python 装饰器高级用法有哪些?带参数装饰器、类装饰器和 functools.wraps 详解装饰器的高级用法围绕三个问题:怎么传参数、怎么保持被装饰函数的元信息、什么时候用类而不是函数写装饰器。 ## 带参数的装饰器 普通装饰器只能装饰函数,不能接收额外参数。需要参数时,加一层嵌套: ```python def retry(max_attempts=3, delay=1): def decorator(func): def wrapper(*args, **kwargs): for attempt in range(max_attempts): try: return func(*args, **kwargs) except Exception as e: if attempt == max_attempts - 1: raise time.sleep(delay) return wrapper return decorator @retry(max_attempts=5, delay=2) def call_api(): ... ``` `@retry(5, 2)` 先调用 `retry(5, 2)` 返回 `decorator`,再 `decorator(call_api)` 返回 `wrapper`。三层嵌套是带参数装饰器的固定模式。 ## functools.wraps:保持函数身份 装饰器替换了原函数,导致 `__name__`、`__doc__` 等元信息丢失: ```python def log(func): def wrapper(*args, **kwargs): print(f"Calling {func.__name__}") return func(*args, **kwargs) return wrapper @log def hello(): pass print(hello.__name__) # 'wrapper' 而不是 'hello' ``` 加 `@wraps(func)` 把原函数的元信息复制到 wrapper: ```python from functools import wraps def log(func): @wraps(func) def wrapper(*args, **kwargs): print(f"Calling {func.__name__}") return func(*args, **kwargs) return wrapper print(hello.__name__) # 'hello' ``` `@wraps` 必须加——调试时看到 `wrapper` 而不是实际函数名,排查问题会非常痛苦。 ## 类装饰器 用类写装饰器,适合需要维护状态的场景: ```python class CountCalls: def __init__(self, func): self.func = func self.count = 0 wraps(func)(self) # 保持元信息 def __call__(self, *args, **kwargs): self.count += 1 print(f"{self.func.__name__} called {self.count} times") return self.func(*args, **kwargs) @CountCalls def say_hello(): print("Hello") say_hello() # say_hello called 1 times; Hello say_hello() # say_hello called 2 times; Hello ``` `__init__` 接收被装饰的函数,`__call__` 每次调用时执行。`self.count` 跨调用持久化——函数写装饰器用闭包变量存状态,类装饰器用实例属性,语义更清晰。 ## 装饰类 装饰器不只能装饰函数,还能装饰整个类: ```python def add_repr(cls): def __repr__(self): attrs = ', '.join(f'{k}={v!r}' for k, v in self.__dict__.items()) return f'{cls.__name__}({attrs})' cls.__repr__ = __repr__ return cls @add_repr class User: def __init__(self, name, age): self.name = name self.age = age print(User('Alice', 30)) # User(name='Alice', age=30) ``` dataclass 的 `@dataclass` 就是类装饰器——自动生成 `__init__`、`__repr__`、`__eq__`。 ## 常见的高级装饰器模式 **1. 缓存/记忆化** ```python from functools import lru_cache @lru_cache(maxsize=128) def expensive(n): return sum(i ** 2 for i in range(n)) ``` 标准库自带,不用自己写。`maxsize=None` 无限缓存。 **2. 类型检查** ```python def validate(**types): def decorator(func): @wraps(func) def wrapper(**kwargs): for name, expected in types.items(): if name in kwargs and not isinstance(kwargs[name], expected): raise TypeError(f'{name} must be {expected}') return func(**kwargs) return wrapper return decorator @validate(age=int, name=str) def create_user(name, age): ... ``` **3. 超时控制** ```python import signal def timeout(seconds): def decorator(func): @wraps(func) def wrapper(*args, **kwargs): def handler(signum, frame): raise TimeoutError(f'{func.__name__} timed out') old = signal.signal(signal.SIGALRM, handler) signal.alarm(seconds) try: return func(*args, **kwargs) finally: signal.alarm(0) signal.signal(signal.SIGALRM, old) return wrapper return decorator ``` ## 追问 ### 多个装饰器的执行顺序? 从下到上装饰,从上到下执行。`@a @b def f()` 等价于 `a(b(f))`,调用 `f()` 时先执行 `a` 的逻辑,再执行 `b` 的逻辑,最后执行 `f`。 ### 装饰器和 AOP 是什么关系? 装饰器是 Python 实现 AOP(面向切面编程)的方式。日志、权限、缓存这些"横切关注点"用装饰器统一处理,不侵入业务代码。
服务端6月2日 01:32
Python 元类是什么?type 怎么创建类?元类什么时候该用?元类是创建类的类。普通类创建实例,元类创建类。Python 里 `class` 语句本质上是调用 `type()` 来创建类对象,元类让你拦截这个过程,在类创建时自动修改类的属性、方法、继承关系。 ## Python 类的创建过程 ```python class Foo: x = 1 ``` 这行代码执行时,Python 做了这件事: ```python Foo = type('Foo', (object,), {'x': 1}) ``` `type(类名, 父类元组, 属性字典)` 就是创建类的底层调用。`type` 本身就是一个元类——所有类都是 `type` 的实例。 ```python print(type(Foo)) # <class 'type'> print(type(Foo())) # <class '__main__.Foo'> ``` `type(Foo)` 是 `type`,说明 Foo 类是 type 的实例。`type(Foo())` 是 Foo,说明 Foo() 实例是 Foo 的实例。 ## 自定义元类 继承 `type`,重写 `__new__` 或 `__init__`: ```python class Meta(type): def __new__(cls, name, bases, dct): # 在类创建前修改属性字典 dct['created_by_meta'] = True return super().__new__(cls, name, bases, dct) class Foo(metaclass=Meta): x = 1 print(Foo.created_by_meta) # True ``` `__new__` 在类对象创建之前调用,可以修改 `dct`(属性字典)。`__init__` 在类对象创建之后调用,可以修改已创建的类。大多数场景用 `__new__` 就够了。 ## 实际用途 **1. 自动注册子类** ORM、插件系统常用——每定义一个子类就自动注册到一个全局字典里: ```python class RegistryMeta(type): _registry = {} def __new__(cls, name, bases, dct): klass = super().__new__(cls, name, bases, dct) if name != 'Base': # 跳过基类本身 cls._registry[name] = klass return klass class Base(metaclass=RegistryMeta): pass class User(Base): pass class Order(Base): pass print(RegistryMeta._registry) # {'User': <class 'User'>, 'Order': <class 'Order'>} ``` Django 的 ModelBase 就是这种模式——每个 Model 子类自动注册到 Django 的 app registry。 **2. 接口检查** 强制子类实现特定方法: ```python class InterfaceMeta(type): def __new__(cls, name, bases, dct): if bases: # 不是基类本身 required = getattr(bases[0], '_required_methods', []) for method in required: if method not in dct: raise TypeError(f'{name} 必须实现 {method}') return super().__new__(cls, name, bases, dct) class Animal(metaclass=InterfaceMeta): _required_methods = ['speak'] class Dog(Animal): def speak(self): return 'Woof!' class Cat(Animal): pass # TypeError: Cat 必须实现 speak ``` **3. 单例模式** ```python class SingletonMeta(type): _instances = {} def __call__(cls, *args, **kwargs): if cls not in cls._instances: cls._instances[cls] = super().__call__(*args, **kwargs) return cls._instances[cls] class Database(metaclass=SingletonMeta): pass ``` 重写 `__call__` 拦截实例化过程——第一次创建实例,后续返回同一个。 ## 追问 ### 元类和类装饰器有什么区别? 类装饰器在类创建之后修改,元类在类创建过程中修改。类装饰器更简单透明,优先用装饰器。元类能控制 `__new__` 和 `__init__`,能做的事更多但也更难调试。 ### 什么时候该用元类? 几乎不需要。Python 社区有一句话:"如果你不确定是否需要元类,你就不需要。"95% 的场景用类装饰器、`__init_subclass__`、或普通继承就能解决。元类适合框架作者(Django ORM、SQLAlchemy),应用层开发者很少需要。
服务端6月2日 01:31
Python 闭包是什么?变量怎么被记住的?闭包是一个函数记住了自己被创建时的作用域里的变量,即使那个作用域已经执行完毕。Python 里闭包最常见的用途:工厂函数、延迟计算、替代简单类。 ## 闭包怎么产生的 ```python def make_counter(start=0): count = start def counter(): nonlocal count count += 1 return count return counter c = make_counter(10) print(c()) # 11 print(c()) # 12 ``` `make_counter` 执行完了,`count` 变量按理应该被销毁。但 `counter` 函数内部引用了 `count`,Python 会把 `count` 和 `counter` 绑在一起——这就是闭包。`counter` 闭包了 `count` 变量。 `nonlocal` 声明告诉 Python `count` 不是局部变量,而是外层作用域的变量。不加 `nonlocal`,`count += 1` 会在 `counter` 内部创建一个新的局部变量 `count`,而不是修改外层的。 ## 闭包存储在哪里 闭包变量存在函数的 `__closure__` 属性里: ```python print(c.__closure__) # (<cell at 0x...: int object at ...>,) print(c.__closure__[0].cell_contents) # 12 ``` 每个被闭包的变量是一个 cell 对象。这就是 Python 实现闭包的底层机制。 ## 常见用途 **1. 工厂函数** 根据参数生成不同的函数: ```python def power(exp): def f(base): return base ** exp return f square = power(2) cube = power(3) print(square(5)) # 25 print(cube(5)) # 125 ``` **2. 缓存/记忆化** ```python def memoize(fn): cache = {} def wrapper(*args): if args not in cache: cache[args] = fn(*args) return cache[args] return wrapper @memoize def fib(n): if n < 2: return n return fib(n-1) + fib(n-2) print(fib(100)) # 瞬间出结果 ``` `cache` 被 `wrapper` 闭包,不需要全局变量。这就是装饰器能工作的基础——装饰器本质上就是闭包。 **3. 替代简单类** 只有状态 + 一个方法的场景,闭包比类轻量: ```python # 闭包方式 def make_accumulator(initial=0): total = initial def add(value): nonlocal total total += value return total return add acc = make_accumulator() print(acc(10)) # 10 print(acc(20)) # 30 ``` ## 追问 ### 闭包和类怎么选? 有多个方法或复杂状态管理用类。只有一个操作 + 简单状态用闭包。闭包更函数式,类更面向对象。 ### 闭包会导致内存泄漏吗? 会。被闭包的变量不会在函数返回后释放,只要闭包函数还活着,变量就一直在。长期存活的闭包(如事件监听器、回调)要注意。 ### lambda 能形成闭包吗? 能。`lambda x: x + offset` 里的 `offset` 就是被闭包的变量。但 lambda 不能用 `nonlocal`,所以无法修改外层变量,只能读取。
服务端5月29日 00:09
Python 多线程和多进程有什么区别?GIL 对多线程有什么影响?核心区别:进程是资源分配单位,有独立内存空间;线程是调度单位,共享进程内存。在 Python 里这道题的特殊之处是 GIL——全局解释器锁让同一时刻只有一个线程执行 Python 字节码,所以 CPU 密集型任务用多线程不会加速,必须用多进程。I/O 密集型任务多线程够用,因为等 I/O 时 GIL 会释放。 ## 追问 ### GIL 到底锁的是什么?为什么不能去掉? GIL 保护的是 CPython 的内存管理——引用计数。CPython 的对象引用计数不是线程安全的,如果多个线程同时修改引用计数,对象可能被提前释放或泄漏。加锁是最简单的方案,代价是多线程无法真正并行。Python 3.13 开始实验性支持 free-threaded 模式(PEP 703),试图移除 GIL,但目前生态兼容性还是问题。短期内 GIL 不会消失。 ### 多线程既然不能并行,还有用吗? 有用。线程等待 I/O(网络请求、文件读写、数据库查询)时会释放 GIL,其他线程可以执行。所以 I/O 密集型场景(爬虫、API 调用、数据库操作)用多线程完全没问题。实测:100 个 HTTP 请求,单线程串行 10 秒,10 线程并发约 1.2 秒,接近 10 倍加速。但如果是纯计算(比如大矩阵运算),10 线程和 1 线程耗时几乎一样,甚至更慢——线程切换本身有开销。 ### 什么时候用多进程?有什么坑? CPU 密集型任务:数据处理、图像处理、机器学习训练。`multiprocessing.Pool` 是最常用的接口。最大的坑是进程间通信开销——进程不共享内存,数据要序列化传输。传一个大 numpy 数组给子进程,pickle 序列化的时间可能比计算本身还长。解决方案:用 `multiprocessing.shared_memory`(Python 3.8+)或 `multiprocessing.Array` 共享内存,避免序列化。另一个坑是子进程的异常不会自动传播到主进程,需要手动处理。 ### 协程和多线程有什么区别? 协程是用户态的协作式调度,线程是内核态的抢占式调度。协程的切换由代码控制(await),线程的切换由操作系统控制。协程没有锁的问题——同一个时刻只有一个协程在执行,不存在竞态条件。代价是协程里不能有阻塞调用,一阻塞整个事件循环就卡住了。asyncio 适合高并发 I/O(上万连接的聊天服务器),threading 适合少量 I/O 并发或需要兼容阻塞库的场景。 ## 写段代码 ```python from multiprocessing import Pool import time def heavy_compute(n): return sum(i * i for i in range(n)) if __name__ == '__main__': # 多进程:4核并行,接近4倍加速 with Pool(4) as p: results = p.map(heavy_compute, [10**7] * 4) print(results) # 对比:多线程对CPU密集型无效 # 4个线程和1个线程耗时几乎一样(GIL) ```
服务端5月29日 00:07
Python 描述符是什么?数据描述符和非数据描述符优先级怎么排?描述符是实现了 `__get__`、`__set__`、`__delete__` 中任意一个的类,被赋值给另一个类的类属性后,会拦截那个属性的访问。Python 的属性查找有一套隐藏规则:当解释器在类(及其 MRO)的 `__dict__` 里找到的值是描述符时,不会直接返回它,而是调用描述符的 `__get__` 方法。这就是 `property`、`classmethod`、`staticmethod` 的底层原理——它们都是描述符。 ## 追问 ### 数据描述符和非数据描述符有什么区别?优先级怎么排? 关键区别是有没有 `__set__`。实现了 `__get__` + `__set__` 的叫数据描述符,只有 `__get__` 的叫非数据描述符。优先级:数据描述符 > 实例 `__dict__` > 非数据描述符。换句话说,数据描述符能拦截赋值操作,实例 `__dict__` 里写不进去;非数据描述符拦截不了,一旦实例 `__dict__` 有了同名 key 就被覆盖了。这就是为什么 `property`(数据描述符)设了 setter 后 `obj.x = 1` 一定走 setter,而普通方法(非数据描述符)可以被实例属性遮蔽。 ### Python 属性查找的完整顺序是什么? 按这个顺序:1. 类及其 MRO 的 `__dict__` 里找,如果是数据描述符就调 `__get__` 返回;2. 实例 `__dict__` 里找;3. 回到类的 `__dict__`,如果是非数据描述符就调 `__get__` 返回。这解释了一个经典面试题:为什么实例能覆盖普通方法但覆盖不了 property?因为方法是非数据描述符,步骤 2 的实例 `__dict__` 优先级更高;property 是数据描述符,步骤 1 就截走了。 ### __set_name__ 是什么?为什么需要它? Python 3.6 新增的钩子。描述符被赋值到类属性时,解释器自动调用 `desc.__set_name__(owner, name)`,把属性名传进去。之前描述符不知道自己叫什么名字,要么手动传(`age = Typed('age', int)`),要么用元类扫描类 `__dict__` 来推断。有了 `__set_name__`,Django ORM 的 `name = CharField()` 就不用重复写字段名了——`CharField.__set_name__` 会自动收到 `'name'`。 ### 描述符里怎么存值?为什么不能直接用 self.xxx? 描述符实例是类级别的,所有实例共享同一个描述符对象。如果你在 `__set__` 里写 `self.value = val`,所有实例共享同一个 value,后面的赋值会覆盖前面的。正确做法是存到 `obj.__dict__[self.name]` 里,或者用 `weakref.WeakKeyDictionary` 做 `self` → `value` 的映射。前一种更常见(property 就这么做的),后一种适合描述符本身需要维护额外状态的场景。 ## 写段代码 ```python # 用 __set_name__ 实现类型检查描述符 class Typed: def __init__(self, expected_type): self.expected_type = expected_type def __set_name__(self, owner, name): self.name = name # 自动获取属性名 self.storage = f'_{name}' def __get__(self, obj, objtype=None): if obj is None: return self return getattr(obj, self.storage, None) def __set__(self, obj, value): if not isinstance(value, self.expected_type): raise TypeError(f'{self.name} 需要 {self.expected_type.__name__}') setattr(obj, self.storage, value) class User: name = Typed(str) age = Typed(int) u = User() u.name = "Alice" # OK u.age = 25 # OK # u.age = "25" # TypeError: age 需要 int ```
服务端5月29日 00:06
Python 列表推导式和生成器表达式有什么区别?什么时候该用哪个?区别就一个字:方括号 `[]` 立即算出所有结果放进列表,圆括号 `()` 返回一个生成器对象,用到哪个才算哪个。`[x**2 for x in range(10)]` 执行完内存里就有 10 个数;`(x**2 for x in range(10))` 执行完只多了一个 200 字节的生成器对象,值还没算。 ## 追问 ### 生成器只能遍历一次,踩过坑吗? 这是最常见的陷阱。你写 `gen = (x for x in range(5))`,第一次 `list(gen)` 得到 `[0,1,2,3,4]`,再 `list(gen)` 就是 `[]`——生成器耗尽了。如果后面的代码还要用,要么转成列表存起来,要么重新创建生成器。调试时这个坑尤其烦人:你在调试器里 `print(list(gen))` 看了一眼,后面代码就拿不到数据了。 ### sum(x**2 for x in range(N)) 和 sum([x**2 for x in range(N)]) 哪个快? 大多数人觉得生成器快,实际不一定。生成器省内存是肯定的,但每次 yield 有函数调用开销。列表推导式的循环在 C 层执行(CPython 实现中 listcomp 是专用字节码),而生成器每次 yield 要切换栈帧。数据量小时列表版反而更快——省下的内存分配开销比 yield 开销小。数据量大时生成器版才赢,因为列表版要先把所有结果存内存。经验值:N < 1000 差别可以忽略,N > 10 万生成器才有明显优势。 ### 字典推导式和集合推导式呢? 语法一样,换括号就行:`{k: v for k, v in pairs}` 是字典推导式,`{x for x in items}` 是集合推导式。注意集合推导式没有生成器版本——`{x for x in items}` 是立即求值的,没有惰性集合。如果需要惰性去重,得用生成器 + `set()` 分两步。 ### 嵌套推导式怎么读? 从左到右读,和嵌套 for 循环的顺序一致。`[f(x,y) for x in A for y in B]` 等价于 `for x in A: for y in B: f(x,y)`。超过两层就该换普通循环了,没人能在脑内解析三层推导式。Python 之禅说得明白:可读性很重要。 ## 写段代码 ```python # 生成器只能遍历一次的坑 gen = (x**2 for x in range(5)) print(list(gen)) # [0, 1, 4, 9, 16] print(list(gen)) # [] ← 耗尽了! # 需要多次使用就转列表 squares = [x**2 for x in range(5)] print(squares[:3]) # [0, 1, 4] print(squares[3:]) # [9, 16] # 管道式处理用生成器省内存 lines = (line.strip() for line in open('big.log')) # 不读全文 errors = (line for line in lines if 'ERROR' in line) count = sum(1 for _ in errors) # 只计数,不存结果 ```
服务端5月29日 00:05
Python 面向对象的核心概念有哪些?MRO 和描述符怎么理解?Python 面向对象的核心是四件事:用类组织数据和行为的封装机制、通过继承复用代码、用多态让不同对象响应同一接口、以及 Python 自己的特殊之处——MRO、描述符、__slots__ 这些面试高频考点。基础概念(类/实例/属性/方法)不展开,下面只说容易踩坑和被追问的部分。 ## 追问 ### Python 的 MRO 是怎么排的?为什么不用深度优先? Python 3 用 C3 线性化算法计算 MRO。核心规则:子类排在父类前面,同一层按定义顺序排,不能违反前两条规定。为什么不用深度优先?因为菱形继承下深度优先会重复访问基类。经典例子:D 继承 B 和 C,B 和 C 都继承 A,深度优先的顺序是 D→B→A→C→A,A 被访问两次。C3 的结果是 D→B→C→A,每个类只出现一次,且 B 在 C 前面(定义顺序)。通过 `ClassName.__mro__` 可以查看任意类的解析顺序。 ### __slots__ 能省多少内存?有什么代价? 普通 Python 对象用 `__dict__` 存属性,一个空对象就要占 56 字节(64 位 CPython)。`__slots__` 用固定数组替代字典,属性直接按偏移量访问,省掉哈希表开销。实际测量:100 万个只有 name 和 age 属性的对象,用 `__dict__` 约 160MB,用 `__slots__` 约 48MB,省 70%。代价是不能再动态添加属性,而且继承时如果父类没有声明 `__slots__`,子类照样会有 `__dict__`,优化白做。实际项目中,Django 的 QuerySet 用了 `__slots__` 优化大量小对象。 ### 描述符是什么?property 和 classmethod 跟它什么关系? 描述符是实现了 `__get__`、`__set__`、`__delete__` 中任意一个的类。Python 的属性查找有个隐藏步骤:如果找到的对象是描述符,就调用它的 `__get__` 返回结果,而不是直接返回对象本身。`property` 就是描述符——你的 getter/setter 被 `__get__`/`__set__` 包装了;`classmethod` 也是描述符——它的 `__get__` 把类传给函数而不是实例。区分数据描述符(有 `__set__`)和非数据描述符(只有 `__get__`):数据描述符优先级高于实例 `__dict__`,非数据描述符优先级低于实例 `__dict__`。这就是为什么 property 能拦截赋值而普通方法不行。 ### __new__ 和 __init__ 有什么区别? `__new__` 创建对象并返回,`__init__` 初始化已创建的对象。`__new__` 是类方法(第一个参数是 cls),`__init__` 是实例方法(第一个参数是 self)。单例模式用 `__new__` 控制:如果 `_instance` 已存在就直接返回,不再创建新对象。`__init__` 做不到这点——它执行时对象已经创建了。另一个场景:不可变类型(str、int、tuple)的子类化必须重写 `__new__`,因为这些类型的对象在 `__new__` 阶段就已经确定了值,`__init__` 改不了。 ## 写段代码 ```python # 描述符实现懒加载属性 class LazyProperty: def __init__(self, func): self.func = func def __get__(self, obj, cls): if obj is None: return self value = self.func(obj) obj.__dict__[self.func.__name__] = value # 缓存到实例字典 return value class Data: @LazyProperty def expensive(self): print("计算中...") return sum(range(1000000)) d = Data() print(d.expensive) # 计算中... 499999500000 print(d.expensive) # 499999500000(不再计算,从 __dict__ 直接取) ```
服务端2月21日 17:10
Python 函数式编程有哪些特性和应用场景?# Python 函数式编程详解 ## 函数式编程的基本概念 函数式编程是一种编程范式,强调使用纯函数、避免可变状态和副作用。Python 虽然不是纯函数式语言,但提供了丰富的函数式编程工具。 ### 纯函数 纯函数是指相同的输入总是产生相同的输出,并且没有任何副作用。 ```python # 纯函数示例 def add(a, b): return a + b print(add(2, 3)) # 5 print(add(2, 3)) # 5 - 相同输入,相同输出 # 非纯函数示例 counter = 0 def increment(): global counter counter += 1 return counter print(increment()) # 1 print(increment()) # 2 - 相同输入,不同输出(有副作用) ``` ### 不可变数据 函数式编程倾向于使用不可变数据结构。 ```python # 不可变操作 original_list = [1, 2, 3] new_list = original_list + [4, 5] # 创建新列表,不修改原列表 print(original_list) # [1, 2, 3] print(new_list) # [1, 2, 3, 4, 5] # 可变操作(不推荐) original_list.append(4) # 修改原列表 print(original_list) # [1, 2, 3, 4] ``` ## 高阶函数 高阶函数是指接受函数作为参数或返回函数的函数。 ### map 函数 `map` 函数对可迭代对象的每个元素应用指定函数。 ```python # 基本用法 numbers = [1, 2, 3, 4, 5] squared = list(map(lambda x: x ** 2, numbers)) print(squared) # [1, 4, 9, 16, 25] # 使用命名函数 def square(x): return x ** 2 squared = list(map(square, numbers)) print(squared) # [1, 4, 9, 16, 25] # 多个可迭代对象 numbers1 = [1, 2, 3] numbers2 = [4, 5, 6] summed = list(map(lambda x, y: x + y, numbers1, numbers2)) print(summed) # [5, 7, 9] ``` ### filter 函数 `filter` 函数根据条件过滤可迭代对象的元素。 ```python # 基本用法 numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] even_numbers = list(filter(lambda x: x % 2 == 0, numbers)) print(even_numbers) # [2, 4, 6, 8, 10] # 使用命名函数 def is_even(x): return x % 2 == 0 even_numbers = list(filter(is_even, numbers)) print(even_numbers) # [2, 4, 6, 8, 10] # 过滤字符串 words = ["apple", "banana", "cherry", "date"] long_words = list(filter(lambda x: len(x) > 5, words)) print(long_words) # ['banana', 'cherry'] ``` ### reduce 函数 `reduce` 函数对可迭代对象的元素进行累积操作。 ```python from functools import reduce # 基本用法 numbers = [1, 2, 3, 4, 5] sum_result = reduce(lambda x, y: x + y, numbers) print(sum_result) # 15 # 计算乘积 product = reduce(lambda x, y: x * y, numbers) print(product) # 120 # 使用初始值 sum_with_initial = reduce(lambda x, y: x + y, numbers, 10) print(sum_with_initial) # 25 # 查找最大值 max_value = reduce(lambda x, y: x if x > y else y, numbers) print(max_value) # 5 ``` ### sorted 函数 `sorted` 函数对可迭代对象进行排序。 ```python # 基本排序 numbers = [3, 1, 4, 1, 5, 9, 2, 6] sorted_numbers = sorted(numbers) print(sorted_numbers) # [1, 1, 2, 3, 4, 5, 6, 9] # 降序排序 sorted_desc = sorted(numbers, reverse=True) print(sorted_desc) # [9, 6, 5, 4, 3, 2, 1, 1] # 按键排序 students = [ {"name": "Alice", "age": 25}, {"name": "Bob", "age": 20}, {"name": "Charlie", "age": 30} ] sorted_by_age = sorted(students, key=lambda x: x["age"]) print(sorted_by_age) # [{'name': 'Bob', 'age': 20}, {'name': 'Alice', 'age': 25}, {'name': 'Charlie', 'age': 30}] ``` ## Lambda 表达式 Lambda 表达式是匿名函数,适用于简单的函数定义。 ### 基本语法 ```python # Lambda 表达式 add = lambda x, y: x + y print(add(3, 5)) # 8 # 等价于 def add(x, y): return x + y ``` ### 实际应用 ```python # 与高阶函数结合使用 numbers = [1, 2, 3, 4, 5] squared = list(map(lambda x: x ** 2, numbers)) print(squared) # [1, 4, 9, 16, 25] # 排序 students = [("Alice", 25), ("Bob", 20), ("Charlie", 30)] sorted_students = sorted(students, key=lambda x: x[1]) print(sorted_students) # [('Bob', 20), ('Alice', 25), ('Charlie', 30)] # 条件表达式 get_grade = lambda score: "A" if score >= 90 else "B" if score >= 80 else "C" print(get_grade(95)) # A print(get_grade(85)) # B print(get_grade(75)) # C ``` ### Lambda 的限制 ```python # Lambda 只能包含表达式,不能包含语句 # 错误示例 # bad_lambda = lambda x: if x > 0: return x # 语法错误 # 正确做法 good_lambda = lambda x: x if x > 0 else 0 print(good_lambda(5)) # 5 print(good_lambda(-5)) # 0 ``` ## 装饰器 装饰器是高阶函数的一种应用,用于修改或增强函数的行为。 ### 基本装饰器 ```python def my_decorator(func): def wrapper(): print("函数执行前") func() print("函数执行后") return wrapper @my_decorator def say_hello(): print("Hello!") say_hello() # 输出: # 函数执行前 # Hello! # 函数执行后 ``` ### 带参数的装饰器 ```python def repeat(times): def decorator(func): def wrapper(*args, **kwargs): for _ in range(times): result = func(*args, **kwargs) return result return wrapper return decorator @repeat(3) def greet(name): print(f"Hello, {name}!") greet("Alice") # 输出: # Hello, Alice! # Hello, Alice! # Hello, Alice! ``` ### 保留函数元数据 ```python from functools import wraps def logging_decorator(func): @wraps(func) def wrapper(*args, **kwargs): print(f"调用函数: {func.__name__}") return func(*args, **kwargs) return wrapper @logging_decorator def calculate(x, y): """计算两个数的和""" return x + y print(calculate.__name__) # calculate print(calculate.__doc__) # 计算两个数的和 ``` ## 偏函数 偏函数固定函数的某些参数,创建新的函数。 ```python from functools import partial # 基本用法 def power(base, exponent): return base ** exponent square = partial(power, exponent=2) cube = partial(power, exponent=3) print(square(5)) # 25 print(cube(5)) # 125 # 实际应用 def greet(name, greeting, punctuation): return f"{greeting}, {name}{punctuation}" hello = partial(greet, greeting="Hello", punctuation="!") goodbye = partial(greet, greeting="Goodbye", punctuation=".") print(hello("Alice")) # Hello, Alice! print(goodbye("Bob")) # Goodbye, Bob. ``` ## 列表推导式与生成器表达式 ### 列表推导式 ```python # 基本用法 numbers = [1, 2, 3, 4, 5] squared = [x ** 2 for x in numbers] print(squared) # [1, 4, 9, 16, 25] # 带条件 even_squared = [x ** 2 for x in numbers if x % 2 == 0] print(even_squared) # [4, 16] # 嵌套 matrix = [[1, 2, 3], [4, 5, 6], [7, 8, 9]] flattened = [item for row in matrix for item in row] print(flattened) # [1, 2, 3, 4, 5, 6, 7, 8, 9] ``` ### 生成器表达式 ```python # 基本用法 numbers = (x ** 2 for x in range(10)) print(list(numbers)) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # 内存效率 # 列表推导式 - 占用大量内存 large_list = [x ** 2 for x in range(1000000)] # 生成器表达式 - 几乎不占用内存 large_gen = (x ** 2 for x in range(1000000)) ``` ## 实际应用场景 ### 1. 数据处理管道 ```python from functools import reduce # 处理数据管道 data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] # 过滤偶数 even = filter(lambda x: x % 2 == 0, data) # 平方 squared = map(lambda x: x ** 2, even) # 求和 result = reduce(lambda x, y: x + y, squared) print(result) # 220 ``` ### 2. 函数组合 ```python def compose(*functions): """组合多个函数""" def inner(arg): result = arg for func in reversed(functions): result = func(result) return result return inner # 定义函数 def add_one(x): return x + 1 def multiply_two(x): return x * 2 def square(x): return x ** 2 # 组合函数 pipeline = compose(square, multiply_two, add_one) print(pipeline(3)) # ((3 + 1) * 2) ** 2 = 64 ``` ### 3. 柯里化 ```python def curry(func): """柯里化函数""" def curried(*args): if len(args) >= func.__code__.co_argcount: return func(*args) return lambda *more_args: curried(*(args + more_args)) return curried @curry def add(a, b, c): return a + b + c add_1 = add(1) add_1_2 = add_1(2) result = add_1_2(3) print(result) # 6 # 也可以链式调用 result = add(1)(2)(3) print(result) # 6 ``` ### 4. 记忆化 ```python from functools import lru_cache # 使用 lru_cache 装饰器 @lru_cache(maxsize=128) def fibonacci(n): if n < 2: return n return fibonacci(n-1) + fibonacci(n-2) print(fibonacci(100)) # 快速计算 # 手动实现记忆化 def memoize(func): cache = {} def wrapper(*args): if args not in cache: cache[args] = func(*args) return cache[args] return wrapper @memoize def fibonacci_manual(n): if n < 2: return n return fibonacci_manual(n-1) + fibonacci_manual(n-2) print(fibonacci_manual(100)) # 快速计算 ``` ## 函数式编程的优势 ### 1. 可预测性 ```python # 纯函数的行为是可预测的 def calculate_discount(price, discount_rate): return price * (1 - discount_rate) print(calculate_discount(100, 0.2)) # 80.0 print(calculate_discount(100, 0.2)) # 80.0 - 总是相同 ``` ### 2. 可测试性 ```python # 纯函数易于测试 def add(a, b): return a + b # 测试 assert add(2, 3) == 5 assert add(-1, 1) == 0 assert add(0, 0) == 0 ``` ### 3. 并行性 ```python # 纯函数可以安全地并行执行 from concurrent.futures import ThreadPoolExecutor def process_item(item): return item ** 2 items = list(range(1000)) with ThreadPoolExecutor(max_workers=4) as executor: results = list(executor.map(process_item, items)) ``` ### 4. 代码简洁性 ```python # 函数式风格更简洁 numbers = [1, 2, 3, 4, 5] # 命令式风格 squared = [] for num in numbers: squared.append(num ** 2) # 函数式风格 squared = list(map(lambda x: x ** 2, numbers)) ``` ## 最佳实践 ### 1. 优先使用纯函数 ```python # 好的做法 - 纯函数 def calculate_total(price, tax_rate): return price * (1 + tax_rate) # 不好的做法 - 有副作用 total = 0 def add_to_total(amount): global total total += amount ``` ### 2. 避免过度使用 Lambda ```python # 不好的做法 - 复杂的 Lambda complex_lambda = lambda x: x ** 2 if x > 0 else (x * 2 if x < 0 else 0) # 好的做法 - 使用命名函数 def process_number(x): if x > 0: return x ** 2 elif x < 0: return x * 2 else: return 0 ``` ### 3. 合理使用列表推导式 ```python # 简单情况 - 使用列表推导式 squared = [x ** 2 for x in range(10)] # 复杂情况 - 使用生成器或循环 def complex_process(data): for item in data: # 复杂的处理逻辑 processed = item * 2 if processed > 10: yield processed ``` ### 4. 使用内置函数 ```python # 好的做法 - 使用内置函数 numbers = [1, 2, 3, 4, 5] total = sum(numbers) maximum = max(numbers) minimum = min(numbers) # 不好的做法 - 手动实现 total = 0 for num in numbers: total += num ``` ## 总结 Python 函数式编程的核心概念: 1. **纯函数**:相同输入总是产生相同输出,无副作用 2. **不可变数据**:避免修改原始数据,创建新数据 3. **高阶函数**:接受或返回函数的函数(map, filter, reduce) 4. **Lambda 表达式**:匿名函数,适用于简单操作 5. **装饰器**:修改或增强函数行为 6. **偏函数**:固定函数参数,创建新函数 7. **列表推导式**:简洁地创建列表 8. **生成器表达式**:惰性求值,节省内存 函数式编程的优势: - 代码更简洁、更易读 - 更容易测试和调试 - 更好的并行性 - 减少副作用和状态管理 掌握函数式编程技巧,能够编写出更优雅、更高效的 Python 代码。
服务端2月21日 17:10
Python 元编程有哪些特性和应用场景?# Python 元编程详解 ## 元编程的基本概念 元编程是指编写能够操作、生成或修改代码的代码。Python 提供了丰富的元编程工具,包括装饰器、元类、动态属性等。 ### 元编程的应用场景 - 框架开发(如 Django ORM) - 代码生成和自动化 - 动态属性和方法创建 - 面向切面编程(AOP) - 序列化和反序列化 ## 元类(Metaclass) ### 什么是元类 元类是创建类的类,就像类是创建对象的模板一样,元类是创建类的模板。 ```python # 基本概念 class MyClass: pass # MyClass 是 type 的实例 print(type(MyClass)) # <class 'type'> # obj 是 MyClass 的实例 obj = MyClass() print(type(obj)) # <class '__main__.MyClass'> ``` ### 自定义元类 ```python class MyMeta(type): def __new__(cls, name, bases, namespace): # 在类创建时执行 print(f"Creating class: {name}") # 添加类属性 namespace['created_by'] = 'MyMeta' return super().__new__(cls, name, bases, namespace) class MyClass(metaclass=MyMeta): pass print(MyClass.created_by) # MyMeta ``` ### 元类的作用 ```python class SingletonMeta(type): """单例元类""" _instances = {} def __call__(cls, *args, **kwargs): if cls not in cls._instances: cls._instances[cls] = super().__call__(*args, **kwargs) return cls._instances[cls] class Singleton(metaclass=SingletonMeta): def __init__(self, value): self.value = value s1 = Singleton(1) s2 = Singleton(2) print(s1 is s2) # True print(s1.value) # 2 ``` ### 元类的高级用法 ```python class ValidateMeta(type): """验证元类""" def __new__(cls, name, bases, namespace): # 确保类有特定的属性 if 'required_attr' not in namespace: raise TypeError(f"{name} must have 'required_attr'") # 验证方法 for attr_name, attr_value in namespace.items(): if callable(attr_value) and not attr_name.startswith('_'): if not hasattr(attr_value, '__annotations__'): raise TypeError(f"Method {attr_name} must have type hints") return super().__new__(cls, name, bases, namespace) class ValidatedClass(metaclass=ValidateMeta): required_attr = "value" def method(self, x: int) -> int: return x * 2 # class InvalidClass(metaclass=ValidateMeta): # pass # TypeError: InvalidClass must have 'required_attr' ``` ## 动态属性和方法 ### 动态属性 ```python class DynamicAttributes: def __init__(self): self._data = {} def __getattr__(self, name): """访问不存在的属性时调用""" if name.startswith('get_'): attr_name = name[4:] return self._data.get(attr_name) raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'") def __setattr__(self, name, value): """设置属性时调用""" if name.startswith('_'): super().__setattr__(name, value) else: self._data[name] = value def __delattr__(self, name): """删除属性时调用""" if name in self._data: del self._data[name] else: super().__delattr__(name) obj = DynamicAttributes() obj.name = "Alice" obj.age = 25 print(obj.get_name) # Alice print(obj.get_age) # 25 ``` ### 动态方法 ```python class DynamicMethods: def __init__(self): self.methods = {} def add_method(self, name, func): """动态添加方法""" self.methods[name] = func def __getattr__(self, name): """动态调用方法""" if name in self.methods: return self.methods[name] raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'") obj = DynamicMethods() # 动态添加方法 obj.add_method('greet', lambda self, name: f"Hello, {name}!") obj.add_method('calculate', lambda self, x, y: x + y) print(obj.greet("Alice")) # Hello, Alice! print(obj.calculate(3, 5)) # 8 ``` ### 使用 types 模块创建方法 ```python import types class MyClass: pass def new_method(self): return "This is a dynamically added method" # 动态添加方法 MyClass.new_method = new_method obj = MyClass() print(obj.new_method()) # This is a dynamically added method # 使用 types.MethodType def another_method(self, value): return f"Value: {value}" obj.another_method = types.MethodType(another_method, obj) print(obj.another_method(42)) # Value: 42 ``` ## 描述符(Descriptor) ### 描述符协议 描述符是实现 `__get__`、`__set__` 和 `__delete__` 方法的类,用于控制属性的访问。 ```python class Descriptor: def __init__(self, name=None): self.name = name def __get__(self, instance, owner): if instance is None: return self return instance.__dict__.get(self.name, f"No {self.name} set") def __set__(self, instance, value): instance.__dict__[self.name] = value def __delete__(self, instance): if self.name in instance.__dict__: del instance.__dict__[self.name] class Person: name = Descriptor('name') age = Descriptor('age') person = Person() person.name = "Alice" person.age = 25 print(person.name) # Alice print(person.age) # 25 ``` ### 描述符的应用 ```python class ValidatedAttribute: """验证属性描述符""" def __init__(self, validator=None, default=None): self.validator = validator self.default = default self.name = None def __set_name__(self, owner, name): self.name = f"_{name}" def __get__(self, instance, owner): if instance is None: return self return getattr(instance, self.name, self.default) def __set__(self, instance, value): if self.validator and not self.validator(value): raise ValueError(f"Invalid value for {self.name}: {value}") setattr(instance, self.name, value) class User: name = ValidatedAttribute(lambda x: isinstance(x, str) and len(x) > 0) age = ValidatedAttribute(lambda x: isinstance(x, int) and 0 <= x <= 150) email = ValidatedAttribute(lambda x: '@' in x) user = User() user.name = "Alice" user.age = 25 user.email = "alice@example.com" print(user.name) # Alice print(user.age) # 25 # user.age = -5 # ValueError: Invalid value for _age: -5 ``` ## 属性装饰器 ### @property 装饰器 ```python class Temperature: def __init__(self, celsius): self._celsius = celsius @property def celsius(self): """获取摄氏温度""" return self._celsius @celsius.setter def celsius(self, value): """设置摄氏温度""" if value < -273.15: raise ValueError("Temperature below absolute zero") self._celsius = value @property def fahrenheit(self): """获取华氏温度(只读)""" return self._celsius * 9/5 + 32 temp = Temperature(25) print(temp.celsius) # 25 print(temp.fahrenheit) # 77.0 temp.celsius = 30 print(temp.celsius) # 30 # temp.fahrenheit = 100 # AttributeError: can't set attribute ``` ### 动态属性计算 ```python class Circle: def __init__(self, radius): self._radius = radius @property def radius(self): return self._radius @radius.setter def radius(self, value): if value <= 0: raise ValueError("Radius must be positive") self._radius = value @property def diameter(self): return self._radius * 2 @property def area(self): return 3.14159 * self._radius ** 2 @property def circumference(self): return 2 * 3.14159 * self._radius circle = Circle(5) print(circle.diameter) # 10 print(circle.area) # 78.53975 print(circle.circumference) # 31.4159 ``` ## 动态类创建 ### 使用 type 创建类 ```python # 动态创建类 def __init__(self, name): self.name = name def greet(self): return f"Hello, {self.name}!" # 使用 type 创建类 DynamicClass = type( 'DynamicClass', (object,), { '__init__': __init__, 'greet': greet, 'class_var': 'dynamic' } ) obj = DynamicClass("Alice") print(obj.greet()) # Hello, Alice! print(obj.class_var) # dynamic ``` ### 动态创建子类 ```python def create_subclass(base_class, subclass_name, extra_methods=None): """动态创建子类""" namespace = extra_methods or {} return type(subclass_name, (base_class,), namespace) class Base: def base_method(self): return "Base method" # 动态创建子类 extra_methods = { 'extra_method': lambda self: "Extra method" } SubClass = create_subclass(Base, 'SubClass', extra_methods) obj = SubClass() print(obj.base_method()) # Base method print(obj.extra_method()) # Extra method ``` ## 类装饰器 ### 基本类装饰器 ```python def add_class_method(cls): """添加类方法的装饰器""" @classmethod def class_method(cls): return f"Class method of {cls.__name__}" cls.class_method = class_method return cls @add_class_method class MyClass: pass print(MyClass.class_method()) # Class method of MyClass ``` ### 类装饰器的应用 ```python def singleton(cls): """单例类装饰器""" instances = {} def get_instance(*args, **kwargs): if cls not in instances: instances[cls] = cls(*args, **kwargs) return instances[cls] return get_instance @singleton class Database: def __init__(self): self.connection = "Connected" db1 = Database() db2 = Database() print(db1 is db2) # True ``` ### 参数化类装饰器 ```python def add_attributes(**attrs): """添加类属性的装饰器""" def decorator(cls): for name, value in attrs.items(): setattr(cls, name, value) return cls return decorator @add_attributes(version="1.0", author="Alice") class MyClass: pass print(MyClass.version) # 1.0 print(MyClass.author) # Alice ``` ## 实际应用场景 ### 1. ORM 框架 ```python class Field: """字段描述符""" def __init__(self, field_type, primary_key=False): self.field_type = field_type self.primary_key = primary_key self.name = None def __set_name__(self, owner, name): self.name = name def __get__(self, instance, owner): if instance is None: return self return instance.__dict__.get(self.name) def __set__(self, instance, value): if not isinstance(value, self.field_type): raise TypeError(f"Expected {self.field_type}, got {type(value)}") instance.__dict__[self.name] = value class ModelMeta(type): """模型元类""" def __new__(cls, name, bases, namespace): # 收集字段 fields = {} for key, value in namespace.items(): if isinstance(value, Field): fields[key] = value namespace['_fields'] = fields return super().__new__(cls, name, bases, namespace) class Model(metaclass=ModelMeta): def __init__(self, **kwargs): for name, value in kwargs.items(): setattr(self, name, value) class User(Model): id = Field(int, primary_key=True) name = Field(str) age = Field(int) user = User(id=1, name="Alice", age=25) print(user.name) # Alice print(user.age) # 25 ``` ### 2. API 响应验证 ```python class ValidatedResponse: """验证响应类""" def __init__(self, schema): self.schema = schema def __call__(self, cls): def __init__(self, data): self.validate(data) for key, value in data.items(): setattr(self, key, value) def validate(self, data): for field, field_type in self.schema.items(): if field not in data: raise ValueError(f"Missing field: {field}") if not isinstance(data[field], field_type): raise TypeError(f"Invalid type for {field}") cls.__init__ = __init__ cls.validate = validate return cls @ValidatedResponse({'name': str, 'age': int, 'email': str}) class UserResponse: pass user_data = {'name': 'Alice', 'age': 25, 'email': 'alice@example.com'} user = UserResponse(user_data) print(user.name) # Alice ``` ### 3. 动态表单生成 ```python class FormField: """表单字段""" def __init__(self, field_type, required=False, default=None): self.field_type = field_type self.required = required self.default = default self.name = None def __set_name__(self, owner, name): self.name = name def validate(self, value): if self.required and value is None: raise ValueError(f"{self.name} is required") if value is not None and not isinstance(value, self.field_type): raise TypeError(f"Invalid type for {self.name}") return True class FormMeta(type): """表单元类""" def __new__(cls, name, bases, namespace): fields = {} for key, value in namespace.items(): if isinstance(value, FormField): fields[key] = value namespace['_fields'] = fields return super().__new__(cls, name, bases, namespace) class Form(metaclass=FormMeta): def __init__(self, **kwargs): for name, field in self._fields.items(): value = kwargs.get(name, field.default) field.validate(value) setattr(self, name, value) def to_dict(self): return {name: getattr(self, name) for name in self._fields} class UserForm(Form): name = FormField(str, required=True) age = FormField(int, default=18) email = FormField(str, required=True) form = UserForm(name="Alice", email="alice@example.com") print(form.to_dict()) # {'name': 'Alice', 'age': 18, 'email': 'alice@example.com'} ``` ## 最佳实践 ### 1. 谨慎使用元类 ```python # 不好的做法 - 过度使用元类 class ComplexMeta(type): def __new__(cls, name, bases, namespace): # 复杂的元类逻辑 pass # 好的做法 - 使用类装饰器 def add_functionality(cls): # 添加功能 return cls @add_functionality class SimpleClass: pass ``` ### 2. 优先使用描述符而非 __getattr__ ```python # 好的做法 - 使用描述符 class ValidatedField: def __get__(self, instance, owner): return instance.__dict__.get(self.name) def __set__(self, instance, value): instance.__dict__[self.name] = value class MyClass: field = ValidatedField() # 不好的做法 - 使用 __getattr__ class BadClass: def __getattr__(self, name): return self.__dict__.get(name) ``` ### 3. 提供清晰的文档 ```python class MyMeta(type): """自定义元类,用于添加类级别的功能 这个元类会自动为所有类添加 created_at 属性 """ def __new__(cls, name, bases, namespace): namespace['created_at'] = datetime.now() return super().__new__(cls, name, bases, namespace) ``` ### 4. 考虑性能影响 ```python # 缓存属性访问 class CachedProperty: def __init__(self, func): self.func = func self.name = func.__name__ def __get__(self, instance, owner): if instance is None: return self if not hasattr(instance, f'_{self.name}'): setattr(instance, f'_{self.name}', self.func(instance)) return getattr(instance, f'_{self.name}') class MyClass: @CachedProperty def expensive_computation(self): # 耗时计算 return sum(range(1000000)) ``` ## 总结 Python 元编程的核心概念: 1. **元类**:创建类的类,控制类的创建过程 2. **动态属性**:使用 `__getattr__`、`__setattr__` 等方法动态管理属性 3. **动态方法**:运行时添加和修改方法 4. **描述符**:控制属性的访问和修改 5. **属性装饰器**:使用 `@property` 创建计算属性 6. **动态类创建**:使用 `type` 函数动态创建类 7. **类装饰器**:修改或增强类的行为 元编程的应用场景: - 框架开发(ORM、表单验证) - 代码生成和自动化 - 动态 API 创建 - 序列化和反序列化 - 面向切面编程 元编程的注意事项: - 谨慎使用,避免过度设计 - 提供清晰的文档和示例 - 考虑性能影响 - 优先使用简单的解决方案 掌握元编程技巧,能够编写出更灵活、更强大的 Python 代码。
服务端2月17日 22:19
Python 性能优化有哪些技巧和最佳实践?## 性能分析工具 ### 1. timeit 模块 `timeit` 模块用于测量小段代码的执行时间。 ```python import timeit # 测量代码执行时间 code = """ sum(range(1000)) """ execution_time = timeit.timeit(code, number=1000) print(f"执行时间: {execution_time:.4f} 秒") # 使用 timeit 装饰器 @timeit.timeit def test_function(): return sum(range(1000)) test_function() ``` ### 2. cProfile 模块 `cProfile` 模块用于分析程序的性能瓶颈。 ```python import cProfile def slow_function(): total = 0 for i in range(1000000): total += i return total def fast_function(): return sum(range(1000000)) def main(): slow_function() fast_function() # 性能分析 cProfile.run('main()') # 输出分析结果到文件 cProfile.run('main()', filename='profile_stats') ``` ### 3. memory\_profiler `memory_profiler` 用于分析内存使用情况。 ```python # 安装: pip install memory-profiler from memory_profiler import profile @profile def memory_intensive_function(): data = [i for i in range(1000000)] return sum(data) if __name__ == '__main__': memory_intensive_function() ``` ### 4. line\_profiler `line_profiler` 用于逐行分析函数性能。 ```python # 安装: pip install line_profiler from line_profiler import LineProfiler def complex_function(): result = [] for i in range(1000): result.append(i * 2) return sum(result) # 创建性能分析器 lp = LineProfiler() lp_wrapper = lp(complex_function) lp_wrapper() # 显示结果 lp.print_stats() ``` ## 算法优化 ### 1. 选择合适的算法 ```python # 不好的做法 - O(n²) 复杂度 def find_duplicates_slow(arr): duplicates = [] for i in range(len(arr)): for j in range(i + 1, len(arr)): if arr[i] == arr[j] and arr[i] not in duplicates: duplicates.append(arr[i]) return duplicates # 好的做法 - O(n) 复杂度 def find_duplicates_fast(arr): seen = set() duplicates = set() for item in arr: if item in seen: duplicates.add(item) else: seen.add(item) return list(duplicates) ``` ### 2. 使用内置函数 ```python # 不好的做法 - 手动实现 def manual_sum(arr): total = 0 for item in arr: total += item return total # 好的做法 - 使用内置函数 def builtin_sum(arr): return sum(arr) # 性能对比 import timeit print(timeit.timeit(lambda: manual_sum(range(10000)), number=100)) print(timeit.timeit(lambda: builtin_sum(range(10000)), number=100)) ``` ### 3. 避免不必要的计算 ```python # 不好的做法 - 重复计算 def calculate_distances(points): distances = [] for i in range(len(points)): for j in range(len(points)): dx = points[j][0] - points[i][0] dy = points[j][1] - points[i][1] distances.append((dx ** 2 + dy ** 2) ** 0.5) return distances # 好的做法 - 避免重复计算 def calculate_distances_optimized(points): distances = [] for i in range(len(points)): for j in range(i + 1, len(points)): dx = points[j][0] - points[i][0] dy = points[j][1] - points[i][1] distances.append((dx ** 2 + dy ** 2) ** 0.5) return distances ``` ## 数据结构优化 ### 1. 使用合适的数据结构 ```python # 列表查找 - O(n) def find_in_list(lst, target): return target in lst # 集合查找 - O(1) def find_in_set(s, target): return target in s # 性能对比 import timeit lst = list(range(10000)) s = set(range(10000)) print("列表查找:", timeit.timeit(lambda: find_in_list(lst, 5000), number=1000)) print("集合查找:", timeit.timeit(lambda: find_in_set(s, 5000), number=1000)) ``` ### 2. 使用生成器替代列表 ```python # 不好的做法 - 使用列表 def get_squares_list(n): return [i ** 2 for i in range(n)] # 好的做法 - 使用生成器 def get_squares_generator(n): for i in range(n): yield i ** 2 # 内存使用对比 import sys list_obj = get_squares_list(1000000) gen_obj = get_squares_generator(1000000) print(f"列表内存: {sys.getsizeof(list_obj)} 字节") print(f"生成器内存: {sys.getsizeof(gen_obj)} 字节") ``` ### 3. 使用 **slots** 减少内存 ```python class Person: def __init__(self, name, age): self.name = name self.age = age class PersonWithSlots: __slots__ = ['name', 'age'] def __init__(self, name, age): self.name = name self.age = age # 内存对比 import sys p1 = Person("Alice", 25) p2 = PersonWithSlots("Alice", 25) print(f"普通对象: {sys.getsizeof(p1)} 字节") print(f"使用 __slots__: {sys.getsizeof(p2)} 字节") ``` ## I/O 优化 ### 1. 批量处理 I/O ```python # 不好的做法 - 逐行写入 def write_lines_slow(filename, lines): with open(filename, 'w') as f: for line in lines: f.write(line + '\n') # 好的做法 - 批量写入 def write_lines_fast(filename, lines): with open(filename, 'w') as f: f.write('\n'.join(lines)) ``` ### 2. 使用缓冲 ```python # 不好的做法 - 无缓冲 def read_without_buffer(filename): with open(filename, 'r', buffering=0) as f: return f.read() # 好的做法 - 使用缓冲 def read_with_buffer(filename): with open(filename, 'r', buffering=8192) as f: return f.read() ``` ### 3. 异步 I/O ```python import asyncio import aiohttp async def fetch_url(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text() async def fetch_all_urls(urls): tasks = [fetch_url(url) for url in urls] return await asyncio.gather(*tasks) urls = [ "https://www.example.com", "https://www.google.com", "https://www.github.com", ] # 异步获取所有 URL results = asyncio.run(fetch_all_urls(urls)) ``` ## 并发优化 ### 1. 多进程处理 CPU 密集型任务 ```python import multiprocessing def process_data(data_chunk): return sum(x ** 2 for x in data_chunk) def parallel_processing(data, num_processes=4): chunk_size = len(data) // num_processes chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)] with multiprocessing.Pool(processes=num_processes) as pool: results = pool.map(process_data, chunks) return sum(results) data = list(range(1000000)) result = parallel_processing(data) ``` ### 2. 多线程处理 I/O 密集型任务 ```python import threading import requests def download_url(url): response = requests.get(url) return len(response.content) def parallel_download(urls): threads = [] results = [] def worker(url): result = download_url(url) results.append(result) for url in urls: thread = threading.Thread(target=worker, args=(url,)) threads.append(thread) thread.start() for thread in threads: thread.join() return results urls = ["url1", "url2", "url3"] results = parallel_download(urls) ``` ### 3. 使用 concurrent.futures ```python from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor def process_item(item): return item ** 2 def with_thread_pool(items): with ThreadPoolExecutor(max_workers=4) as executor: results = list(executor.map(process_item, items)) return results def with_process_pool(items): with ProcessPoolExecutor(max_workers=4) as executor: results = list(executor.map(process_item, items)) return results items = list(range(1000)) thread_results = with_thread_pool(items) process_results = with_process_pool(items) ``` ## 缓存优化 ### 1. 使用 functools.lru\_cache ```python from functools import lru_cache @lru_cache(maxsize=128) def fibonacci(n): if n < 2: return n return fibonacci(n-1) + fibonacci(n-2) # 快速计算 print(fibonacci(100)) ``` ### 2. 自定义缓存 ```python class Cache: def __init__(self, max_size=128): self.cache = {} self.max_size = max_size def get(self, key): return self.cache.get(key) def set(self, key, value): if len(self.cache) >= self.max_size: self.cache.pop(next(iter(self.cache))) self.cache[key] = value cache = Cache() def expensive_computation(x): cached_result = cache.get(x) if cached_result is not None: return cached_result result = sum(i ** 2 for i in range(x)) cache.set(x, result) return result ``` ### 3. 使用 Redis 缓存 ```python import redis import pickle # 连接 Redis r = redis.Redis(host='localhost', port=6379, db=0) def cache_result(key, value, ttl=3600): """缓存结果""" r.setex(key, ttl, pickle.dumps(value)) def get_cached_result(key): """获取缓存结果""" result = r.get(key) if result: return pickle.loads(result) return None def expensive_operation(data): cache_key = f"result:{hash(str(data))}" # 尝试从缓存获取 cached = get_cached_result(cache_key) if cached: return cached # 执行计算 result = complex_computation(data) # 缓存结果 cache_result(cache_key, result) return result ``` ## 字符串优化 ### 1. 使用 join 替代 + ```python # 不好的做法 - 使用 + def build_string_slow(parts): result = "" for part in parts: result += part return result # 好的做法 - 使用 join def build_string_fast(parts): return ''.join(parts) # 性能对比 import timeit parts = ["part"] * 1000 print(timeit.timeit(lambda: build_string_slow(parts), number=100)) print(timeit.timeit(lambda: build_string_fast(parts), number=100)) ``` ### 2. 使用字符串格式化 ```python # 不好的做法 - 字符串拼接 def format_message_slow(name, age): return "Name: " + name + ", Age: " + str(age) # 好的做法 - 使用 f-string def format_message_fast(name, age): return f"Name: {name}, Age: {age}" # 性能对比 print(timeit.timeit(lambda: format_message_slow("Alice", 25), number=10000)) print(timeit.timeit(lambda: format_message_fast("Alice", 25), number=10000)) ``` ### 3. 使用字符串方法 ```python # 不好的做法 - 手动处理 def process_string_slow(s): result = "" for char in s: if char.isupper(): result += char.lower() else: result += char return result # 好的做法 - 使用内置方法 def process_string_fast(s): return s.lower() # 性能对比 print(timeit.timeit(lambda: process_string_slow("HELLO"), number=10000)) print(timeit.timeit(lambda: process_string_fast("HELLO"), number=10000)) ``` ## 数据库优化 ### 1. 使用连接池 ```python from sqlalchemy import create_engine from sqlalchemy.pool import QueuePool # 创建连接池 engine = create_engine( 'postgresql://user:password@localhost/dbname', poolclass=QueuePool, pool_size=10, max_overflow=5 ) def execute_query(query): with engine.connect() as connection: result = connection.execute(query) return result.fetchall() ``` ### 2. 批量插入 ```python # 不好的做法 - 逐条插入 def insert_slow(items): for item in items: db.execute("INSERT INTO table VALUES (%s)", (item,)) # 好的做法 - 批量插入 def insert_fast(items): db.executemany("INSERT INTO table VALUES (%s)", [(item,) for item in items]) ``` ### 3. 使用索引 ```python # 创建索引 CREATE INDEX idx_name ON users(name); # 使用索引查询 SELECT * FROM users WHERE name = 'Alice'; # 避免全表扫描 # 不好的做法 SELECT * FROM users WHERE LOWER(name) = 'alice'; # 好的做法 SELECT * FROM users WHERE name = 'Alice'; ``` ## 最佳实践 ### 1. 预先分配内存 ```python # 不好的做法 - 动态增长 def build_list_slow(): result = [] for i in range(10000): result.append(i) return result # 好的做法 - 预先分配 def build_list_fast(): return [i for i in range(10000)] ``` ### 2. 避免全局变量 ```python # 不好的做法 - 使用全局变量 counter = 0 def increment_global(): global counter counter += 1 # 好的做法 - 使用局部变量 def increment_local(counter): return counter + 1 ``` ### 3. 使用适当的数据类型 ```python # 不好的做法 - 使用列表存储数值 numbers = [1, 2, 3, 4, 5] # 好的做法 - 使用数组 import array numbers = array.array('i', [1, 2, 3, 4, 5]) # 不好的做法 - 使用字符串存储二进制数据 data = "binary data" # 好的做法 - 使用字节串 data = b"binary data" ``` ### 4. 延迟加载 ```python # 不好的做法 - 立即加载所有数据 def load_all_data(): data = [] for item in large_dataset: processed = process_item(item) data.append(processed) return data # 好的做法 - 延迟加载 def load_data_lazy(): for item in large_dataset: yield process_item(item) ``` ## 性能监控 ### 1. 使用 logging 记录性能 ```python import logging import time logging.basicConfig(level=logging.INFO) def logged_function(func): def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) end_time = time.time() logging.info(f"{func.__name__} 执行时间: {end_time - start_time:.4f} 秒") return result return wrapper @logged_function def expensive_function(): time.sleep(1) return "Done" expensive_function() ``` ### 2. 使用性能计数器 ```python import time from collections import defaultdict class PerformanceMonitor: def __init__(self): self.counters = defaultdict(list) def record(self, name, duration): self.counters[name].append(duration) def get_stats(self, name): durations = self.counters[name] return { 'count': len(durations), 'total': sum(durations), 'average': sum(durations) / len(durations), 'min': min(durations), 'max': max(durations) } monitor = PerformanceMonitor() def monitored_function(func): def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) end_time = time.time() monitor.record(func.__name__, end_time - start_time) return result return wrapper ``` ## 总结 Python 性能优化的关键点: 1. **性能分析工具**:timeit、cProfile、memory\_profiler、line\_profiler 2. **算法优化**:选择合适的算法、使用内置函数、避免不必要的计算 3. **数据结构优化**:使用合适的数据结构、使用生成器、使用 **slots** 4. **I/O 优化**:批量处理、使用缓冲、异步 I/O 5. **并发优化**:多进程、多线程、concurrent.futures 6. **缓存优化**:lru\_cache、自定义缓存、Redis 缓存 7. **字符串优化**:使用 join、字符串格式化、字符串方法 8. **数据库优化**:连接池、批量插入、使用索引 9. **最佳实践**:预先分配内存、避免全局变量、使用适当的数据类型、延迟加载 10. **性能监控**:logging、性能计数器 性能优化原则: * 先测量,后优化 * 优化瓶颈,而非所有代码 * 权衡可读性和性能 * 使用内置函数和库 * 考虑使用 C 扩展或 Cython 掌握性能优化技巧,能够编写出更高效、更快速的 Python 程序。
服务端2月17日 21:48
Python 中的深拷贝和浅拷贝有什么区别?## 拷贝的基本概念 在 Python 中,赋值操作不会创建新的对象,而是创建对同一对象的引用。拷贝操作则创建新的对象。 ### 赋值 vs 拷贝 ```python # 赋值操作 original = [1, 2, 3] assigned = original assigned[0] = 99 print(original) # [99, 2, 3] - 原始列表被修改 # 拷贝操作 import copy original = [1, 2, 3] copied = copy.copy(original) copied[0] = 99 print(original) # [1, 2, 3] - 原始列表未被修改 ``` ## 浅拷贝(Shallow Copy) ### 什么是浅拷贝 浅拷贝创建一个新的对象,但不会递归地复制嵌套的对象。嵌套的对象仍然是共享的引用。 ### 浅拷贝的实现方式 ```python import copy # 1. 使用 copy.copy() original = [1, 2, [3, 4]] shallow = copy.copy(original) # 2. 使用列表的 copy() 方法 shallow = original.copy() # 3. 使用切片 shallow = original[:] # 4. 使用 list() 构造函数 shallow = list(original) # 5. 使用字典的 copy() 方法 original_dict = {'a': 1, 'b': [2, 3]} shallow_dict = original_dict.copy() ``` ### 浅拷贝的问题 ```python import copy original = [1, 2, [3, 4]] shallow = copy.copy(original) # 修改顶层元素 shallow[0] = 99 print(original) # [1, 2, [3, 4]] - 原始列表未被修改 # 修改嵌套对象 shallow[2][0] = 99 print(original) # [1, 2, [99, 4]] - 原始列表被修改! ``` ## 深拷贝(Deep Copy) ### 什么是深拷贝 深拷贝创建一个新的对象,并递归地复制所有嵌套的对象。修改拷贝不会影响原始对象。 ### 深拷贝的实现方式 ```python import copy original = [1, 2, [3, 4]] deep = copy.deepcopy(original) # 修改顶层元素 deep[0] = 99 print(original) # [1, 2, [3, 4]] - 原始列表未被修改 # 修改嵌套对象 deep[2][0] = 99 print(original) # [1, 2, [3, 4]] - 原始列表未被修改 ``` ## 深拷贝与浅拷贝的对比 ### 基本数据类型 ```python import copy # 不可变对象(整数、字符串、元组) a = 42 b = copy.copy(a) c = copy.deepcopy(a) print(a is b) # True - 不可变对象共享引用 print(a is c) # True # 可变对象(列表、字典、集合) original = [1, 2, 3] shallow = copy.copy(original) deep = copy.deepcopy(original) print(original is shallow) # False - 创建了新对象 print(original is deep) # False - 创建了新对象 ``` ### 嵌套结构 ```python import copy original = { 'numbers': [1, 2, 3], 'nested': {'a': [4, 5], 'b': [6, 7]}, 'tuple': (8, 9, [10, 11]) } shallow = copy.copy(original) deep = copy.deepcopy(original) # 修改浅拷贝的嵌套列表 shallow['numbers'][0] = 99 print(original['numbers'][0]) # 99 - 原始对象被修改 # 修改深拷贝的嵌套列表 deep['numbers'][0] = 88 print(original['numbers'][0]) # 99 - 原始对象未被修改 ``` ### 自定义对象的拷贝 ```python import copy class MyClass: def __init__(self, value): self.value = value self.nested = [value * 2, value * 3] def __copy__(self): """实现浅拷贝""" new_obj = type(self)(self.value) new_obj.nested = self.nested return new_obj def __deepcopy__(self, memo): """实现深拷贝""" new_obj = type(self)(self.value) new_obj.nested = copy.deepcopy(self.nested, memo) return new_obj original = MyClass(10) shallow = copy.copy(original) deep = copy.deepcopy(original) shallow.nested[0] = 99 print(original.nested[0]) # 99 - 浅拷贝共享嵌套对象 deep.nested[0] = 88 print(original.nested[0]) # 99 - 深拷贝独立 ``` ## 实际应用场景 ### 1. 处理配置对象 ```python import copy default_config = { 'debug': False, 'max_retries': 3, 'timeout': 30, 'endpoints': ['api1.example.com', 'api2.example.com'] } # 使用深拷贝创建独立配置 config1 = copy.deepcopy(default_config) config2 = copy.deepcopy(default_config) config1['debug'] = True config1['endpoints'].append('api3.example.com') print(default_config['debug']) # False print(default_config['endpoints']) # ['api1.example.com', 'api2.example.com'] ``` ### 2. 处理数据结构 ```python import copy # 处理嵌套数据 data = { 'users': [ {'name': 'Alice', 'scores': [85, 90, 78]}, {'name': 'Bob', 'scores': [92, 88, 95]} ] } # 创建副本进行处理 processed_data = copy.deepcopy(data) # 修改副本不影响原始数据 for user in processed_data['users']: user['average'] = sum(user['scores']) / len(user['scores']) print(processed_data['users'][0]['average']) # 84.333... print('average' in data['users'][0]) # False ``` ### 3. 实现撤销/重做功能 ```python import copy class TextEditor: def __init__(self): self.content = "" self.history = [] def write(self, text): self.history.append(copy.deepcopy(self.content)) self.content += text def undo(self): if self.history: self.content = self.history.pop() def get_content(self): return self.content editor = TextEditor() editor.write("Hello ") editor.write("World!") print(editor.get_content()) # Hello World! editor.undo() print(editor.get_content()) # Hello ``` ### 4. 缓存数据 ```python import copy class DataCache: def __init__(self): self.cache = {} def get(self, key): if key in self.cache: return copy.deepcopy(self.cache[key]) return None def set(self, key, value): self.cache[key] = value cache = DataCache() data = {'items': [1, 2, 3]} cache.set('data', data) # 获取缓存数据的副本 cached_data = cache.get('data') cached_data['items'].append(4) # 原始缓存数据未被修改 original_data = cache.get('data') print(original_data['items']) # [1, 2, 3] ``` ## 性能考虑 ### 深拷贝的性能开销 ```python import copy import time # 大型数据结构 large_data = {'items': list(range(10000))} # 浅拷贝 start = time.time() shallow = copy.copy(large_data) print(f"浅拷贝耗时: {time.time() - start:.6f} 秒") # 深拷贝 start = time.time() deep = copy.deepcopy(large_data) print(f"深拷贝耗时: {time.time() - start:.6f} 秒") ``` ### 选择合适的拷贝方式 ```python import copy # 简单数据结构 - 使用浅拷贝 simple_data = [1, 2, 3, 4, 5] shallow_copy = copy.copy(simple_data) # 嵌套数据结构 - 使用深拷贝 complex_data = [1, 2, [3, 4], {'a': 5}] deep_copy = copy.deepcopy(complex_data) # 只读数据 - 不需要拷贝 read_only_data = (1, 2, 3) # 元组是不可变的 ``` ## 常见问题与解决方案 ### 1. 循环引用 ```python import copy # 创建循环引用 a = [1, 2] b = [3, 4] a.append(b) b.append(a) # 深拷贝处理循环引用 try: deep_copy = copy.deepcopy(a) print("深拷贝成功处理循环引用") except RecursionError: print("无法处理循环引用") ``` ### 2. 自定义对象的拷贝 ```python import copy class Node: def __init__(self, value): self.value = value self.next = None def __deepcopy__(self, memo): new_node = Node(self.value) memo[id(self)] = new_node if self.next: new_node.next = copy.deepcopy(self.next, memo) return new_node # 创建链表 node1 = Node(1) node2 = Node(2) node3 = Node(3) node1.next = node2 node2.next = node3 # 深拷贝链表 copied_list = copy.deepcopy(node1) print(copied_list.value) # 1 print(copied_list.next.value) # 2 ``` ### 3. 拷贝不可变对象 ```python import copy # 不可变对象不需要拷贝 immutable = (1, 2, 3) shallow = copy.copy(immutable) deep = copy.deepcopy(immutable) print(immutable is shallow) # True print(immutable is deep) # True ``` ## 最佳实践 ### 1. 明确拷贝需求 ```python import copy # 需要独立修改嵌套对象 - 使用深拷贝 data = {'config': {'timeout': 30}} independent_copy = copy.deepcopy(data) # 只需要修改顶层对象 - 使用浅拷贝 data = [1, 2, 3, 4, 5] shallow_copy = copy.copy(data) ``` ### 2. 避免不必要的拷贝 ```python import copy # 不好的做法 - 不必要的拷贝 def process_data(data): copied = copy.deepcopy(data) return sum(copied) # 好的做法 - 直接使用原始数据 def process_data(data): return sum(data) ``` ### 3. 使用上下文管理器 ```python import copy from contextlib import contextmanager @contextmanager def copy_context(data, deep=False): """创建拷贝上下文""" copied = copy.deepcopy(data) if deep else copy.copy(data) yield copied # 使用上下文管理器 original = [1, 2, [3, 4]] with copy_context(original, deep=True) as copied: copied[2][0] = 99 print(original) # [1, 2, [3, 4]] - 原始数据未被修改 ``` ### 4. 文档化拷贝行为 ```python import copy class DataProcessor: """数据处理类 注意:process_data 方法会修改输入数据,如需保留原始数据, 请在调用前使用 copy.deepcopy() 创建副本。 """ def process_data(self, data): data[0] = 99 return data # 使用示例 processor = DataProcessor() original = [1, 2, 3] processed = processor.process_data(copy.deepcopy(original)) ``` ## 总结 深拷贝与浅拷贝的关键区别: ### 浅拷贝 * 创建新对象,但嵌套对象共享引用 * 使用 `copy.copy()` 或对象的 `copy()` 方法 * 适用于简单数据结构或不需要修改嵌套对象的场景 * 性能开销较小 ### 深拷贝 * 创建新对象,递归复制所有嵌套对象 * 使用 `copy.deepcopy()` * 适用于复杂嵌套数据结构 * 性能开销较大 ### 选择建议 1. **简单数据结构**:使用浅拷贝 2. **嵌套数据结构**:使用深拷贝 3. **只读数据**:不需要拷贝 4. **性能敏感**:避免不必要的拷贝 5. **自定义对象**:实现 `__copy__` 和 `__deepcopy__` 方法 理解深拷贝与浅拷贝的区别,能够正确处理数据复制,避免意外的数据修改问题。