四、并发与并行
4.1 GIL(全局解释器锁)
import threading
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# GIL的影响:CPU密集型任务
def cpu_bound(n):
"""CPU密集型任务"""
total = 0
for i in range(n):
total += i ** 2
return total
def io_bound(n):
"""IO密集型任务"""
time.sleep(n)
return n
# 测试GIL对CPU密集型任务的影响
def test_threading():
"""多线程执行CPU任务(受GIL限制)"""
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(cpu_bound, 10_000_000) for _ in range(4)]
results = [f.result() for f in futures]
print(f"多线程CPU任务耗时: {time.time() - start:.2f}秒")
def test_multiprocessing():
"""多进程执行CPU任务(绕过GIL)"""
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(cpu_bound, 10_000_000) for _ in range(4)]
results = [f.result() for f in futures]
print(f"多进程CPU任务耗时: {time.time() - start:.2f}秒")
# 执行测试(注意:多进程开销较大)
# test_threading()
# test_multiprocessing()
4.2 多线程与多进程
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# 线程安全示例
class Counter:
def __init__(self):
self.value = 0
self.lock = threading.Lock()
def increment(self):
with self.lock: # 使用锁保证线程安全
self.value += 1
def get_value(self):
with self.lock:
return self.value
# 生产者-消费者模式
import queue
def producer(q, items):
for item in items:
q.put(item)
print(f"生产: {item}")
q.put(None) # 结束信号
def consumer(q):
while True:
item = q.get()
if item is None:
break
print(f"消费: {item}")
# 多进程共享内存
from multiprocessing import shared_memory
def create_shared_array(size):
"""创建共享内存数组"""
import numpy as np
shm = shared_memory.SharedMemory(create=True, size=size)
arr = np.ndarray((size,), dtype=np.uint8, buffer=shm.buf)
return arr, shm
# 进程池的使用
def worker(x):
return x * x
def process_pool_demo():
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(worker, range(10)))
print(f"并行计算结果: {results}")
4.3 异步编程(asyncio)
import asyncio
import aiohttp
import time
# 事件循环基础
async def hello():
print("Hello")
await asyncio.sleep(1)
print("World")
# 运行异步函数
asyncio.run(hello())
# 并发执行多个协程
async def fetch_data(delay, name):
print(f"开始获取 {name}")
await asyncio.sleep(delay)
print(f"完成获取 {name}")
return f"{name}的数据"
async def concurrent_tasks():
# 创建多个任务
tasks = [
asyncio.create_task(fetch_data(2, "任务A")),
asyncio.create_task(fetch_data(1, "任务B")),
asyncio.create_task(fetch_data(3, "任务C"))
]
# 等待所有任务完成
results = await asyncio.gather(*tasks)
return results
# 异步HTTP请求
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def async_http_demo():
async with aiohttp.ClientSession() as session:
urls = [
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/user-agent"
]
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for url, result in zip(urls, results):
print(f"{url}: {len(result)} 字节")
# 异步上下文管理器
class AsyncResource:
async def __aenter__(self):
print("获取资源")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("释放资源")
await asyncio.sleep(0.1)
async def use_async_resource():
async with AsyncResource() as resource:
print("使用资源")
# 异步迭代器
class AsyncCounter:
def __init__(self, limit):
self.limit = limit
self.count = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.count >= self.limit:
raise StopAsyncIteration
self.count += 1
await asyncio.sleep(0.1)
return self.count
async def iterate_async():
async for num in AsyncCounter(5):
print(f"异步迭代: {num}")
五、模块与包架构
5.1 模块导入机制
import sys
import importlib
# 模块搜索路径
print("模块搜索路径:")
for path in sys.path[:5]:
print(f" {path}")
# 自定义导入钩子
class CustomImporter:
def find_spec(self, name, path, target=None):
if name == "custom_module":
# 自定义模块加载逻辑
return importlib.machinery.ModuleSpec(name, self)
return None
def create_module(self, spec):
module = type(sys)(spec.name)
module.hello = lambda: "Hello from custom module"
return module
def exec_module(self, module):
pass
# 安装自定义导入器
sys.meta_path.insert(0, CustomImporter())
# 导入自定义模块
import custom_module
print(custom_module.hello())
# 查看已导入的模块
print(f"\n已导入模块数量: {len(sys.modules)}")
# 动态导入
module_name = "math"
math_module = importlib.import_module(module_name)
print(f"动态导入 {module_name}.sqrt(16) = {math_module.sqrt(16)}")
# 重新加载模块
import json
print(f"JSON版本: {json.__version__ if hasattr(json, '__version__') else 'unknown'}")
importlib.reload(json)
5.2 包的架构设计
# 包的结构示例
"""
my_package/
├── __init__.py # 包初始化文件
├── module1.py
├── module2.py
├── subpackage/
│ ├── __init__.py
│ └── submodule.py
└── utils/
├── __init__.py
├── helpers.py
└── validators.py
"""
# __init__.py 的作用
# my_package/__init__.py
"""
# 控制包的导入行为
__all__ = ['module1', 'module2'] # from package import * 时导入的模块
# 包级别的初始化代码
print("初始化 my_package")
# 简化导入路径
from .module1 import Class1
from .module2 import function2
# 包版本信息
__version__ = "1.0.0"
__author__ = "Python架构师"
"""
# 命名空间包(Python 3.3+)
# 不需要 __init__.py 的包
# 相对导入
# from . import module1
# from .. import parent_module
# from .subpackage import submodule
# 懒加载模块
class LazyLoader:
def __init__(self, module_name):
self.module_name = module_name
self._module = None
def _load(self):
if self._module is None:
self._module = __import__(self.module_name)
return self._module
def __getattr__(self, name):
return getattr(self._load(), name)
# 使用懒加载
lazy_math = LazyLoader("math")
print(lazy_math.sqrt(16)) # 第一次使用时才加载
5.3 依赖管理
# 依赖注入(DI)示例
class Service:
def __init__(self, name):
self.name = name
def serve(self):
return f"{self.name}提供服务"
class Client:
def __init__(self, service: Service):
self.service = service
def do_work(self):
return self.service.serve()
# 手动依赖注入
service = Service("数据库服务")
client = Client(service)
print(client.do_work())
# 简单的依赖注入容器
class Container:
def __init__(self):
self._services = {}
def register(self, name, factory):
self._services[name] = factory
def resolve(self, name):
factory = self._services.get(name)
if factory is None:
raise ValueError(f"服务 {name} 未注册")
return factory(self)
def register_instance(self, name, instance):
self._services[name] = lambda _: instance
# 使用容器
container = Container()
container.register("logger", lambda c: print("日志服务"))
container.register("database", lambda c: {"host": "localhost", "port": 3306})
# 延迟加载
class LazyService:
def __init__(self, container, service_name):
self.container = container
self.service_name = service_name
self._instance = None
def __call__(self):
if self._instance is None:
self._instance = self.container.resolve(self.service_name)
return self._instance
# 使用上下文管理器管理依赖
class DependencyScope:
def __init__(self, container):
self.container = container
self._scoped_services = {}
def get(self, name):
if name not in self._scoped_services:
self._scoped_services[name] = self.container.resolve(name)
return self._scoped_services[name]
def __enter__(self):
return self
def __exit__(self, *args):
self._scoped_services.clear()
六、性能架构
6.1 性能分析工具
import cProfile
import pstats
import line_profiler
import memory_profiler
import time
# cProfile - 函数级性能分析
def fibonacci(n):
if n <= 1:
return n
return fibonacci(n-1) + fibonacci(n-2)
def profile_demo():
profiler = cProfile.Profile()
profiler.enable()
fibonacci(30)
profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(10)
# line_profiler - 行级性能分析
@profile # 需要安装 line_profiler
def line_profiler_demo():
total = 0
for i in range(10000):
if i % 2 == 0:
total += i
else:
total -= i
return total
# memory_profiler - 内存分析
@profile
def memory_profiler_demo():
a = [1] * (10 ** 6)
b = [2] * (2 * 10 ** 7)
del b
return a
# 自定义性能装饰器
def performance_timer(func):
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
end = time.perf_counter()
print(f"{func.__name__} 耗时: {end - start:.6f}秒")
return result
return wrapper
@performance_timer
def slow_function():
time.sleep(0.1)
return 42
# 使用 timeit 进行微基准测试
import timeit
def test_list_append():
lst = []
for i in range(1000):
lst.append(i)
def test_list_comprehension():
lst = [i for i in range(1000)]
print("append耗时:", timeit.timeit(test_list_append, number=1000))
print("comprehension耗时:", timeit.timeit(test_list_comprehension, number=1000))
6.2 性能优化策略
# 1. 使用局部变量减少属性查找
import math
# 慢速
def slow_circle_area(radius):
return math.pi * radius * radius
# 快速
def fast_circle_area(radius):
pi = math.pi # 局部变量
return pi * radius * radius
# 2. 使用列表推导代替循环
# 慢速
def slow_squares(n):
result = []
for i in range(n):
result.append(i ** 2)
return result
# 快速
def fast_squares(n):
return [i ** 2 for i in range(n)]
# 3. 使用生成器节省内存
def read_large_file(file_path):
with open(file_path, 'r') as f:
for line in f:
yield line.strip()
# 4. 字符串拼接优化
# 慢速
def slow_string_concat(items):
result = ""
for item in items:
result += str(item)
return result
# 快速
def fast_string_concat(items):
return ''.join(str(item) for item in items)
# 5. 使用内置函数
# 慢速
def slow_sum(numbers):
total = 0
for n in numbers:
total += n
return total
# 快速
def fast_sum(numbers):
return sum(numbers)
# 6. 缓存计算结果
from functools import lru_cache
@lru_cache(maxsize=128)
def fibonacci_cached(n):
if n <= 1:
return n
return fibonacci_cached(n-1) + fibonacci_cached(n-2)
# 7. 使用__slots__减少内存
class Point:
__slots__ = ('x', 'y')
def __init__(self, x, y):
self.x = x
self.y = y
# 8. 避免循环中的属性访问
# 慢速
class Data:
value = 10
def slow_loop():
total = 0
for _ in range(1000000):
total += Data.value
return total
# 快速
def fast_loop():
total = 0
value = Data.value # 缓存到局部变量
for _ in range(1000000):
total += value
return total
6.3 JIT编译与加速
# Numba JIT编译
from numba import jit, njit
@jit(nopython=True)
def numba_fibonacci(n):
if n <= 1:
return n
return numba_fibonacci(n-1) + numba_fibonacci(n-2)
@njit # nopython模式
def numba_array_sum(arr):
total = 0
for i in range(len(arr)):
total += arr[i]
return total
# Cython加速
"""
# cython_example.pyx
def cython_sum(int[:] arr):
cdef int total = 0
cdef int i
for i in range(arr.shape[0]):
total += arr[i]
return total
"""
# PyPy JIT
# PyPy会自动优化循环和数值计算
# 使用numpy加速数值计算
import numpy as np
def pure_python_sum(arr):
total = 0
for x in arr:
total += x
return total
def numpy_sum(arr):
return np.sum(arr)
# 向量化操作
def pure_python_vector_add(a, b):
return [x + y for x, y in zip(a, b)]
def numpy_vector_add(a, b):
return np.array(a) + np.array(b)