一、起点:当并发只是一个“配置项”
在多数采集项目的早期,并发控制几乎是一个无需讨论的问题。
我们会在配置文件里写下一个数字,比如 10、20、50,然后根据服务器配置或“经验”进行微调。这个阶段的并发模型有几个典型特征:
- 并发上限是静态的
- 性能调优依赖人工经验
- 假设网络、代理、目标站点都是“相对稳定的”
这种模型在小规模、短周期任务中通常没有问题,甚至非常高效。但一旦进入真实生产环境,问题就会逐渐暴露。
并发数本身并不等于吞吐量。
当请求链路中引入代理 IP、跨地域网络、反爬策略之后,“固定并发”开始变成系统中最脆弱的一环。
二、并发控制 1.0:固定并发模型的边界
固定并发模型的优势非常明显:
- 实现简单
- 易于理解
- 资源占用可预测
但它隐含了一个前提:
外部环境变化速度,低于系统决策速度。
现实恰恰相反。
在真实运行中你会看到这些现象:
- 某一时间段代理 IP 响应突然变慢
- 目标站点在高频访问后临时限速
- 网络抖动导致超时率瞬间升高
固定并发对此毫无感知。
它不会“意识到”系统已经进入高风险区,只会继续按原速度发请求,直到错误率全面爆发。
这一阶段的问题不是“并发太低或太高”,而是:
并发被当成了常量,而不是一个需要被管理的状态。
三、并发控制 2.0:规则驱动的并发调节
当固定并发开始频繁出问题,很多团队会自然演进到第二阶段:规则驱动的并发控制。
典型做法包括:
- 超时次数超过阈值,手动降低并发
- 连续失败 N 次,暂停任务
- 成功率回升后,再人工调高并发
相比 1.0,这种方式已经明显进步:
- 开始关注运行时指标
- 尝试用规则应对异常
- 能在一定程度上避免“雪崩式失败”
但问题也很明显。
规则本身是离线设定的,而环境是连续变化的。
你永远在做“事后反应”,而不是实时调节。
更重要的是,这种模型仍然有一个核心假设:
人,永远比系统更懂什么时候该加速或减速。
在代理 IP 质量波动频繁、反爬策略不透明的场景下,这个假设越来越站不住脚。
四、并发控制 3.0:反馈驱动的自适应模型
真正的转折点,是我们开始把并发控制视为一个反馈系统问题。
并发不再由规则直接决定,而是由运行结果反向塑造。
这个模型的核心变化体现在三个方面:
第一,并发上限变成了一个动态变量
它不再是写死的数字,而是可以随时间上下浮动。
第二,系统直接使用真实请求结果作为输入信号
响应时间、超时、失败,本身就是最真实的环境反馈。
第三,并发调整是渐进且可回退的
没有突变,没有跳跃,系统始终在寻找一个“可接受区间”。
在这个模型下,开始具备某种“环境感知能力”。
五、实战项目:自适应并发采集的工程落地
1. 项目目标
- 在使用代理 IP 的前提下稳定抓取目标页面
- 自动适配代理质量与站点限速变化
- 避免频繁人工调参
代理IP使用的是 亿牛云爬虫代理,通过用户名和密码认证方式接入。
2. 代理配置
PROXY_HOST = "proxy.16yun.cn"
PROXY_PORT = 31111
PROXY_USER = "username" # 16YUN代理用户名
PROXY_PASS = "password" # 16YUN代理密码
PROXY_URL = f"http://{PROXY_USER}:{PROXY_PASS}@{PROXY_HOST}:{PROXY_PORT}"
3. 自适应并发控制器
import asyncio
from collections import deque
class AdaptiveConcurrencyController:
def __init__(self, min_limit=2, max_limit=20):
self.min_limit = min_limit
self.max_limit = max_limit
self.current_limit = min_limit
self.semaphore = asyncio.Semaphore(self.current_limit)
# 保存最近一段时间的响应耗时
self.latency_window = deque(maxlen=20)
def record_latency(self, latency):
self.latency_window.append(latency)
def adjust(self):
if len(self.latency_window) < 5:
return
avg_latency = sum(self.latency_window) / len(self.latency_window)
# 响应快,说明系统仍有余量
if avg_latency < 0.8 and self.current_limit < self.max_limit:
self.current_limit += 1
# 响应变慢,说明已接近风险区
elif avg_latency > 2.0 and self.current_limit > self.min_limit:
self.current_limit -= 1
self.semaphore = asyncio.Semaphore(self.current_limit)
4. 请求执行与反馈采集
import aiohttp
import time
async def fetch(url, session, controller):
async with controller.semaphore:
start = time.time()
try:
async with session.get(url, proxy=PROXY_URL, timeout=10) as resp:
await resp.text()
controller.record_latency(time.time() - start)
return resp.status
except Exception:
# 将异常视为高耗时反馈
controller.record_latency(5)
return None
5. 调度逻辑
async def run(urls):
controller = AdaptiveConcurrencyController()
async with aiohttp.ClientSession() as session:
for i in range(0, len(urls), 10):
batch = urls[i:i + 10]
tasks = [fetch(url, session, controller) for url in batch]
await asyncio.gather(*tasks)
controller.adjust()
print("当前并发上限:", controller.current_limit)
六、架构视角下的结论
回顾整个并发控制的演化过程,会发现一个明显趋势:
- 1.0 阶段,系统只“执行命令”
- 2.0 阶段,系统开始“遵循规则”
- 3.0 阶段,系统学会“理解反馈”
并发控制的本质,正在从参数管理转向系统能力建设。
当系统能够根据真实环境自行调整节奏时,稳定性不再依赖经验,而成为一种自然结果。