乐闻世界logo
搜索文章和话题

Python 性能优化有哪些技巧和最佳实践?

2月17日 22:19

性能分析工具

1. timeit 模块

timeit 模块用于测量小段代码的执行时间。

python
import timeit # 测量代码执行时间 code = """ sum(range(1000)) """ execution_time = timeit.timeit(code, number=1000) print(f"执行时间: {execution_time:.4f} 秒") # 使用 timeit 装饰器 @timeit.timeit def test_function(): return sum(range(1000)) test_function()

2. cProfile 模块

cProfile 模块用于分析程序的性能瓶颈。

python
import cProfile def slow_function(): total = 0 for i in range(1000000): total += i return total def fast_function(): return sum(range(1000000)) def main(): slow_function() fast_function() # 性能分析 cProfile.run('main()') # 输出分析结果到文件 cProfile.run('main()', filename='profile_stats')

3. memory_profiler

memory_profiler 用于分析内存使用情况。

python
# 安装: pip install memory-profiler from memory_profiler import profile @profile def memory_intensive_function(): data = [i for i in range(1000000)] return sum(data) if __name__ == '__main__': memory_intensive_function()

4. line_profiler

line_profiler 用于逐行分析函数性能。

python
# 安装: pip install line_profiler from line_profiler import LineProfiler def complex_function(): result = [] for i in range(1000): result.append(i * 2) return sum(result) # 创建性能分析器 lp = LineProfiler() lp_wrapper = lp(complex_function) lp_wrapper() # 显示结果 lp.print_stats()

算法优化

1. 选择合适的算法

python
# 不好的做法 - O(n²) 复杂度 def find_duplicates_slow(arr): duplicates = [] for i in range(len(arr)): for j in range(i + 1, len(arr)): if arr[i] == arr[j] and arr[i] not in duplicates: duplicates.append(arr[i]) return duplicates # 好的做法 - O(n) 复杂度 def find_duplicates_fast(arr): seen = set() duplicates = set() for item in arr: if item in seen: duplicates.add(item) else: seen.add(item) return list(duplicates)

2. 使用内置函数

python
# 不好的做法 - 手动实现 def manual_sum(arr): total = 0 for item in arr: total += item return total # 好的做法 - 使用内置函数 def builtin_sum(arr): return sum(arr) # 性能对比 import timeit print(timeit.timeit(lambda: manual_sum(range(10000)), number=100)) print(timeit.timeit(lambda: builtin_sum(range(10000)), number=100))

3. 避免不必要的计算

python
# 不好的做法 - 重复计算 def calculate_distances(points): distances = [] for i in range(len(points)): for j in range(len(points)): dx = points[j][0] - points[i][0] dy = points[j][1] - points[i][1] distances.append((dx ** 2 + dy ** 2) ** 0.5) return distances # 好的做法 - 避免重复计算 def calculate_distances_optimized(points): distances = [] for i in range(len(points)): for j in range(i + 1, len(points)): dx = points[j][0] - points[i][0] dy = points[j][1] - points[i][1] distances.append((dx ** 2 + dy ** 2) ** 0.5) return distances

数据结构优化

1. 使用合适的数据结构

python
# 列表查找 - O(n) def find_in_list(lst, target): return target in lst # 集合查找 - O(1) def find_in_set(s, target): return target in s # 性能对比 import timeit lst = list(range(10000)) s = set(range(10000)) print("列表查找:", timeit.timeit(lambda: find_in_list(lst, 5000), number=1000)) print("集合查找:", timeit.timeit(lambda: find_in_set(s, 5000), number=1000))

2. 使用生成器替代列表

python
# 不好的做法 - 使用列表 def get_squares_list(n): return [i ** 2 for i in range(n)] # 好的做法 - 使用生成器 def get_squares_generator(n): for i in range(n): yield i ** 2 # 内存使用对比 import sys list_obj = get_squares_list(1000000) gen_obj = get_squares_generator(1000000) print(f"列表内存: {sys.getsizeof(list_obj)} 字节") print(f"生成器内存: {sys.getsizeof(gen_obj)} 字节")

3. 使用 slots 减少内存

python
class Person: def __init__(self, name, age): self.name = name self.age = age class PersonWithSlots: __slots__ = ['name', 'age'] def __init__(self, name, age): self.name = name self.age = age # 内存对比 import sys p1 = Person("Alice", 25) p2 = PersonWithSlots("Alice", 25) print(f"普通对象: {sys.getsizeof(p1)} 字节") print(f"使用 __slots__: {sys.getsizeof(p2)} 字节")

I/O 优化

1. 批量处理 I/O

python
# 不好的做法 - 逐行写入 def write_lines_slow(filename, lines): with open(filename, 'w') as f: for line in lines: f.write(line + '\n') # 好的做法 - 批量写入 def write_lines_fast(filename, lines): with open(filename, 'w') as f: f.write('\n'.join(lines))

2. 使用缓冲

python
# 不好的做法 - 无缓冲 def read_without_buffer(filename): with open(filename, 'r', buffering=0) as f: return f.read() # 好的做法 - 使用缓冲 def read_with_buffer(filename): with open(filename, 'r', buffering=8192) as f: return f.read()

3. 异步 I/O

python
import asyncio import aiohttp async def fetch_url(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text() async def fetch_all_urls(urls): tasks = [fetch_url(url) for url in urls] return await asyncio.gather(*tasks) urls = [ "https://www.example.com", "https://www.google.com", "https://www.github.com", ] # 异步获取所有 URL results = asyncio.run(fetch_all_urls(urls))

并发优化

1. 多进程处理 CPU 密集型任务

python
import multiprocessing def process_data(data_chunk): return sum(x ** 2 for x in data_chunk) def parallel_processing(data, num_processes=4): chunk_size = len(data) // num_processes chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)] with multiprocessing.Pool(processes=num_processes) as pool: results = pool.map(process_data, chunks) return sum(results) data = list(range(1000000)) result = parallel_processing(data)

2. 多线程处理 I/O 密集型任务

python
import threading import requests def download_url(url): response = requests.get(url) return len(response.content) def parallel_download(urls): threads = [] results = [] def worker(url): result = download_url(url) results.append(result) for url in urls: thread = threading.Thread(target=worker, args=(url,)) threads.append(thread) thread.start() for thread in threads: thread.join() return results urls = ["url1", "url2", "url3"] results = parallel_download(urls)

3. 使用 concurrent.futures

python
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor def process_item(item): return item ** 2 def with_thread_pool(items): with ThreadPoolExecutor(max_workers=4) as executor: results = list(executor.map(process_item, items)) return results def with_process_pool(items): with ProcessPoolExecutor(max_workers=4) as executor: results = list(executor.map(process_item, items)) return results items = list(range(1000)) thread_results = with_thread_pool(items) process_results = with_process_pool(items)

缓存优化

1. 使用 functools.lru_cache

python
from functools import lru_cache @lru_cache(maxsize=128) def fibonacci(n): if n < 2: return n return fibonacci(n-1) + fibonacci(n-2) # 快速计算 print(fibonacci(100))

2. 自定义缓存

python
class Cache: def __init__(self, max_size=128): self.cache = {} self.max_size = max_size def get(self, key): return self.cache.get(key) def set(self, key, value): if len(self.cache) >= self.max_size: self.cache.pop(next(iter(self.cache))) self.cache[key] = value cache = Cache() def expensive_computation(x): cached_result = cache.get(x) if cached_result is not None: return cached_result result = sum(i ** 2 for i in range(x)) cache.set(x, result) return result

3. 使用 Redis 缓存

python
import redis import pickle # 连接 Redis r = redis.Redis(host='localhost', port=6379, db=0) def cache_result(key, value, ttl=3600): """缓存结果""" r.setex(key, ttl, pickle.dumps(value)) def get_cached_result(key): """获取缓存结果""" result = r.get(key) if result: return pickle.loads(result) return None def expensive_operation(data): cache_key = f"result:{hash(str(data))}" # 尝试从缓存获取 cached = get_cached_result(cache_key) if cached: return cached # 执行计算 result = complex_computation(data) # 缓存结果 cache_result(cache_key, result) return result

字符串优化

1. 使用 join 替代 +

python
# 不好的做法 - 使用 + def build_string_slow(parts): result = "" for part in parts: result += part return result # 好的做法 - 使用 join def build_string_fast(parts): return ''.join(parts) # 性能对比 import timeit parts = ["part"] * 1000 print(timeit.timeit(lambda: build_string_slow(parts), number=100)) print(timeit.timeit(lambda: build_string_fast(parts), number=100))

2. 使用字符串格式化

python
# 不好的做法 - 字符串拼接 def format_message_slow(name, age): return "Name: " + name + ", Age: " + str(age) # 好的做法 - 使用 f-string def format_message_fast(name, age): return f"Name: {name}, Age: {age}" # 性能对比 print(timeit.timeit(lambda: format_message_slow("Alice", 25), number=10000)) print(timeit.timeit(lambda: format_message_fast("Alice", 25), number=10000))

3. 使用字符串方法

python
# 不好的做法 - 手动处理 def process_string_slow(s): result = "" for char in s: if char.isupper(): result += char.lower() else: result += char return result # 好的做法 - 使用内置方法 def process_string_fast(s): return s.lower() # 性能对比 print(timeit.timeit(lambda: process_string_slow("HELLO"), number=10000)) print(timeit.timeit(lambda: process_string_fast("HELLO"), number=10000))

数据库优化

1. 使用连接池

python
from sqlalchemy import create_engine from sqlalchemy.pool import QueuePool # 创建连接池 engine = create_engine( 'postgresql://user:password@localhost/dbname', poolclass=QueuePool, pool_size=10, max_overflow=5 ) def execute_query(query): with engine.connect() as connection: result = connection.execute(query) return result.fetchall()

2. 批量插入

python
# 不好的做法 - 逐条插入 def insert_slow(items): for item in items: db.execute("INSERT INTO table VALUES (%s)", (item,)) # 好的做法 - 批量插入 def insert_fast(items): db.executemany("INSERT INTO table VALUES (%s)", [(item,) for item in items])

3. 使用索引

python
# 创建索引 CREATE INDEX idx_name ON users(name); # 使用索引查询 SELECT * FROM users WHERE name = 'Alice'; # 避免全表扫描 # 不好的做法 SELECT * FROM users WHERE LOWER(name) = 'alice'; # 好的做法 SELECT * FROM users WHERE name = 'Alice';

最佳实践

1. 预先分配内存

python
# 不好的做法 - 动态增长 def build_list_slow(): result = [] for i in range(10000): result.append(i) return result # 好的做法 - 预先分配 def build_list_fast(): return [i for i in range(10000)]

2. 避免全局变量

python
# 不好的做法 - 使用全局变量 counter = 0 def increment_global(): global counter counter += 1 # 好的做法 - 使用局部变量 def increment_local(counter): return counter + 1

3. 使用适当的数据类型

python
# 不好的做法 - 使用列表存储数值 numbers = [1, 2, 3, 4, 5] # 好的做法 - 使用数组 import array numbers = array.array('i', [1, 2, 3, 4, 5]) # 不好的做法 - 使用字符串存储二进制数据 data = "binary data" # 好的做法 - 使用字节串 data = b"binary data"

4. 延迟加载

python
# 不好的做法 - 立即加载所有数据 def load_all_data(): data = [] for item in large_dataset: processed = process_item(item) data.append(processed) return data # 好的做法 - 延迟加载 def load_data_lazy(): for item in large_dataset: yield process_item(item)

性能监控

1. 使用 logging 记录性能

python
import logging import time logging.basicConfig(level=logging.INFO) def logged_function(func): def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) end_time = time.time() logging.info(f"{func.__name__} 执行时间: {end_time - start_time:.4f} 秒") return result return wrapper @logged_function def expensive_function(): time.sleep(1) return "Done" expensive_function()

2. 使用性能计数器

python
import time from collections import defaultdict class PerformanceMonitor: def __init__(self): self.counters = defaultdict(list) def record(self, name, duration): self.counters[name].append(duration) def get_stats(self, name): durations = self.counters[name] return { 'count': len(durations), 'total': sum(durations), 'average': sum(durations) / len(durations), 'min': min(durations), 'max': max(durations) } monitor = PerformanceMonitor() def monitored_function(func): def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) end_time = time.time() monitor.record(func.__name__, end_time - start_time) return result return wrapper

总结

Python 性能优化的关键点:

  1. 性能分析工具:timeit、cProfile、memory_profiler、line_profiler
  2. 算法优化:选择合适的算法、使用内置函数、避免不必要的计算
  3. 数据结构优化:使用合适的数据结构、使用生成器、使用 slots
  4. I/O 优化:批量处理、使用缓冲、异步 I/O
  5. 并发优化:多进程、多线程、concurrent.futures
  6. 缓存优化:lru_cache、自定义缓存、Redis 缓存
  7. 字符串优化:使用 join、字符串格式化、字符串方法
  8. 数据库优化:连接池、批量插入、使用索引
  9. 最佳实践:预先分配内存、避免全局变量、使用适当的数据类型、延迟加载
  10. 性能监控:logging、性能计数器

性能优化原则:

  • 先测量,后优化
  • 优化瓶颈,而非所有代码
  • 权衡可读性和性能
  • 使用内置函数和库
  • 考虑使用 C 扩展或 Cython

掌握性能优化技巧,能够编写出更高效、更快速的 Python 程序。

标签:Python