Python 秒杀系统实战:库存预扣 + 防超卖 极致优化实现

简介: 小张的二手球鞋秒杀系统曾因高并发导致超卖(库存变负)。本文详解三大优化方案:①数据库行锁(简单但性能低);②Redis+Lua预扣库存(原子性防超卖);③消息队列异步落库+令牌桶限流。最终实现万级并发下零超卖、精准库存与系统稳定。(239字)


小张是个独立开发者,自己搭了个二手球鞋交易平台。每次限量款发售,他都得提前喝两杯咖啡盯着后台。因为一秒内涌入上千人抢十双鞋,数据库瞬间卡死,卖出了15双——库存成了负数。他下定决心要重写秒杀系统。
代理 IP 使用小技巧 让你的数据抓取效率翻倍 (47).png

场景还原:库存为什么变成负数
先看小张原来的代码:

def create_order(user_id, product_id):
product = Product.query.get(product_id)
if product.stock > 0:
product.stock -= 1
db.session.commit()
create_order_record(user_id, product_id)
return "成功"
return "已售罄"

看起来没问题,但高并发下藏着巨大的坑。当两个请求同时读到product.stock = 1,都判断stock > 0成立,然后各自减1提交,库存就从1变成了-1。

问题的根源在于“读-判断-写”不是原子操作。多个请求交错执行,互相看不见对方正要做的修改。

方案一:数据库行锁(最直接的防线)
用数据库的行锁机制,让更新操作变成串行执行。MySQL的InnoDB引擎在更新时会自动锁住这一行。

def create_order_with_lock(user_id, product_id):
from sqlalchemy import text

# 用原生SQL加FOR UPDATE,锁住这一行
sql = text("SELECT stock FROM products WHERE id = :pid FOR UPDATE")
product = db.session.execute(sql, {"pid": product_id}).fetchone()

if product.stock > 0:
    update_sql = text("UPDATE products SET stock = stock - 1 WHERE id = :pid")
    db.session.execute(update_sql, {"pid": product_id})
    db.session.commit()
    create_order_record(user_id, product_id)
    return "成功"
db.session.rollback()
return "已售罄"

FOR UPDATE让第一个拿到锁的事务执行完之前,其他所有请求都在排队等待。这样库存肯定减不超,但性能直线下降——每个请求都得等前一个提交才能继续。

适合并发量不大(每秒几十到一百)的场景。小张的平台高峰时上千人抢,这么搞数据库连接池会瞬间爆满。

方案二:Redis预扣库存(性能飞跃)
把库存从数据库搬到内存里。Redis单线程处理命令,天然就没有并发问题。

import redis
import json

r = redis.Redis(host='localhost', port=6379, db=0)

def init_seckill(product_id, stock):
"""秒杀活动开始前,把库存存入Redis"""
r.set(f"stock:{product_id}", stock)

# 记录已下单的用户,防止重复抢
r.delete(f"users:{product_id}")

def seckill(user_id, product_id):

# 用Lua脚本保证原子性
lua_script = """
local stock_key = KEYS[1]
local users_key = KEYS[2]
local user_id = ARGV[1]

-- 检查是否已经抢过
if redis.call('sismember', users_key, user_id) == 1 then
    return -1
end

-- 扣库存
local stock = redis.call('decr', stock_key)
if stock >= 0 then
    redis.call('sadd', users_key, user_id)
    return stock
else
    -- 库存不足,回滚(实际上decr负数后没法回滚,所以先检查)
    return -2
end
"""

# 改进版:先检查再扣减
lua_fixed = """
local stock_key = KEYS[1]
local users_key = KEYS[2]
local user_id = ARGV[1]

if redis.call('sismember', users_key, user_id) == 1 then
    return -1
end

local stock = redis.call('get', stock_key)
if not stock or tonumber(stock) <= 0 then
    return -2
end

redis.call('decr', stock_key)
redis.call('sadd', users_key, user_id)
return tonumber(stock) - 1
"""

stock_key = f"stock:{product_id}"
users_key = f"users:{product_id}"

result = r.eval(lua_fixed, 2, stock_key, users_key, user_id)

if result == -1:
    return "您已经抢过了"
elif result == -2:
    return "已售罄"
else:
    # 异步写入数据库
    async_save_order(user_id, product_id)
    return f"抢到了,剩余{result}件"

Lua脚本在Redis里是原子执行的,整个过程不会被其他命令打断。decr之前先get检查库存,彻底杜绝超卖。

方案三:消息队列削峰填谷
Redis扛住了抢购请求,但每个成功用户都要写数据库创建订单。上万请求同时写数据库,照样会崩。

用消息队列把写操作变成异步的。用户点击抢购后立刻返回“排队中”,后台慢慢处理。

import pika
import threading
from queue import Queue

简单的内存队列(适合单机演示)

order_queue = Queue(maxsize=10000)

def async_save_order(user_id, product_id):
"""生产者:把订单放入队列"""
order_queue.put({
"user_id": user_id,
"product_id": product_id,
"timestamp": time.time()
})

def order_worker():
"""消费者:后台线程慢慢写数据库"""
while True:
order_data = order_queue.get()
try:

        # 这里写数据库
        create_order_record(order_data["user_id"], order_data["product_id"])
        print(f"订单已保存: {order_data}")
    except Exception as e:
        print(f"保存失败: {e}")
        # 失败重试逻辑
        time.sleep(1)
        order_queue.put(order_data)
    finally:
        order_queue.task_done()

启动4个后台线程并发消费

for _ in range(4):
t = threading.Thread(target=order_worker, daemon=True)
t.start()

实际生产环境会用RabbitMQ或Kafka。消费者按数据库能承受的速度慢慢处理,秒杀瞬间的流量洪峰就被削平了。

极致优化:令牌桶限流
即便Redis性能再好,也不可能无限扩展。加上限流,保护系统不被恶意刷单击垮。

import time

class TokenBucket:
def init(self, rate, capacity):
self.rate = rate # 每秒补充的令牌数
self.capacity = capacity # 桶的容量
self.tokens = capacity
self.last_refill = time.time()

def acquire(self):
    now = time.time()
    # 补充令牌
    elapsed = now - self.last_refill
    self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
    self.last_refill = now

    if self.tokens >= 1:
        self.tokens -= 1
        return True
    return False

每个商品独立限流器,每秒只放行100个请求

limiter = TokenBucket(rate=100, capacity=100)

def seckill_with_limit(user_id, product_id):
if not limiter.acquire():
return "系统繁忙,请稍后再试"
return seckill(user_id, product_id)

令牌桶比计数器算法更平滑。漏桶强行让请求匀速通过,令牌桶允许短时间突发流量——比如前0.5秒用掉100个令牌,后0.5秒就只能等令牌慢慢补充。

完整实战:从开始到结束
把上面所有组件拼起来,形成一个完整的秒杀流程:

from flask import Flask, request, jsonify
import redis
import threading
import time

app = Flask(name)
r = redis.Redis(decode_responses=True)

初始化秒杀活动

def init_seckill(product_id, stock, total_limit=1000):
r.set(f"stock:{product_id}", stock)
r.set(f"total_limit:{product_id}", total_limit)
r.delete(f"users:{product_id}")

优化的Lua脚本(检查+扣减+去重)

SECILL_LUA = """
local stock = redis.call('get', KEYS[1])
if not stock or tonumber(stock) <= 0 then
return 0
end
local user_exists = redis.call('sismember', KEYS[2], ARGV[1])
if user_exists == 1 then
return -1
end
redis.call('decr', KEYS[1])
redis.call('sadd', KEYS[2], ARGV[1])
return 1
"""

@app.route('/seckill/')
def seckill_api(product_id):
user_id = request.args.get('user_id')
if not user_id:
return jsonify({"code": 400, "msg": "缺少user_id"})

stock_key = f"stock:{product_id}"
users_key = f"users:{product_id}"

result = r.eval(SECILL_LUA, 2, stock_key, users_key, user_id)

if result == 0:
    return jsonify({"code": 200, "msg": "已售罄"})
elif result == -1:
    return jsonify({"code": 200, "msg": "每人限购一件"})
else:
    # 异步落库
    order_queue.put({"user_id": user_id, "product_id": product_id})
    return jsonify({"code": 200, "msg": "抢购成功,正在处理"})

if name == 'main':
init_seckill(1, 10) # 商品1号,库存10件
app.run(debug=False, threaded=True)

用wrk或ab压测一下:

模拟200个并发,总共1000个请求

wrk -t4 -c200 -d10s --timeout=2s "http://localhost:5000/seckill/1?user_id=123"

Redis轻松扛住几千并发,库存始终没超卖,数据库订单表也因为有队列保护而稳如泰山。

踩坑经验分享
Redis挂了怎么办? 秒杀开始前做一次库存全量备份到数据库。Redis宕机时快速降级到数据库行锁方案,虽然慢但不会丢失订单。

用户重复点击怎么办? 前端按钮置灰只做一半工作。真正防重靠Lua脚本里的sismember检查。更彻底的办法是用SETNX给每个用户加一个短时锁:

def prevent_double_click(user_id, product_id):
lock_key = f"click_lock:{user_id}:{product_id}"
if r.setnx(lock_key, 1):
r.expire(lock_key, 2) # 2秒过期
return True
return False

库存热key问题:几千人抢同一个商品,Redis单节点网卡可能被打满。可以用本地缓存分摊压力:

from cachetools import TTLCache

local_cache = TTLCache(maxsize=100, ttl=1) # 1秒过期

def get_stock_with_cache(product_id):
if product_id in local_cache:
return local_cache[product_id]
stock = r.get(f"stock:{product_id}")
local_cache[product_id] = stock
return stock

每个服务器节点缓存1秒,把Redis的查询压力降低几十倍。

最终架构图
用户请求 → Nginx限流 → Flask应用 → 令牌桶限流 → Redis预扣库存 → 消息队列 → 数据库落库

每一层都是漏斗结构,流量从外到内逐渐收敛。最外层的Nginx挡住恶意刷量,Redis只处理真正的库存操作,数据库最终只写入成功订单。

小张按照这套架构重写了秒杀系统。下一款限量球鞋发售时,后台监控面板一片绿色,10双鞋在0.3秒内被抢光,库存精准归零。他终于可以不用喝咖啡盯后台了。

目录
相关文章
|
17天前
|
SQL 人工智能 安全
为什么你的AI Agent总输出垃圾?因为你没装“技能插件”
本文揭示AI Agent“做事乱”的根源:并非模型能力不足,而是缺乏可执行的技能插件(Skill)。文章指出,大模型缺的不是推理力,而是“怎么做”的上下文——如读文件、查数据库、调API等实操能力。通过MCP协议+工具函数,Skill将业务知识封装为即插即用的数字资产,让Agent从“纸上谈兵的参谋”升级为“自带工具箱的施工队”。
|
1月前
|
存储 人工智能 JavaScript
Prompt、Context、Harness:AI Agent 工程的三层架构解析
2023年重“Prompt”(如何说),2025年重“Context”(看到什么),2026年跃升至“Harness”(系统级约束与验证)。三者非替代而是分层:Prompt优化表达,Context管理信息环境,Harness构建可信执行系统——模型是马,Harness才是缰绳、马鞍与路。
746 10
Prompt、Context、Harness:AI Agent 工程的三层架构解析
|
2月前
|
Ubuntu Linux Docker
超全 Docker 镜像源配置指南|Windows/Mac/Linux一键搞定,拉镜像再也不卡顿
Docker拉取官方镜像慢到离谱,要么超时报错,要么中途断连,折腾半天连基础镜像都拉不下来,直接拖慢整个开发进度。 其实解决办法很简单——配置专属镜像源!今天给大家带来镜像源全平台配置教程,覆盖Linux(Ubuntu/CentOS通用)、Windows/Mac版Docker Desktop,甚至Mac专属轻量工具OrbStack,一步一图+命令复制即用,彻底告别镜像拉取卡顿!
3242 9
|
28天前
|
人工智能 自然语言处理 测试技术
DeepSeek V4:百万上下文,万亿参数,以及重新泛起涟漪的开源池塘
DeepSeek V4发布Pro(1.6T参数/49B激活)与Flash(284B/13B)双模型,均支持1M上下文、thinking模式及Agent能力。全栈开源(权重+技术报告+API+定价),采用混合注意力架构显著降本,中文长文本与推理能力突出,是当前少有的万亿级开源系统级发布
1635 4
DeepSeek V4:百万上下文,万亿参数,以及重新泛起涟漪的开源池塘
|
10天前
|
人工智能 自然语言处理 BI
用办公Agent接管Excel苦力活:跨表匹配、格式清洗、自动图表生成
本文揭秘如何用AI办公Agent自动化处理Excel月度报表:15分钟搞定跨表匹配(模糊+精确双策略)、智能清洗(日期/数字/空白全覆盖)、自动绘图(配色+标题+标签)。告别VLOOKUP、分列、手动调图,让重复劳动归零——真正的效率革命,始于教会机器做脏活。
106 4
|
12天前
|
缓存 前端开发 NoSQL
办公Agent架构设计:如何让一个Agent同时服务销售、运营、人事部门?
本文讲述一个企业级多部门Agent从混乱到优雅的架构演进:直面意图冲突、权限隔离与知识打架三大难题,通过V1失败尝试、V2部门路由+上下文隔离、V3分层知识库(公共/部门/个人)三阶段迭代,最终实现单Agent安全、精准、高效服务销售、运营、人事等多部门。含真实避坑经验与落地案例。(240字)
102 4
|
9天前
|
消息中间件 网络协议 API
OpenClaw反应慢?:排查是代理慢还是模型慢,7个真实原因分析
本文系统解析OpenClaw“反应慢”的7大真实原因,按网络层→模型层→应用层顺序,提供可落地的排查路径与优化方案,助你精准定位瓶颈、告别盲目怀疑代理,提升响应效率。(239字)
218 0
|
4月前
|
存储 Linux 开发者
用pathlib替代os.path:现代Python路径操作最佳实践
本文对比Python中`os.path`与`pathlib`的路径处理方式,展示`pathlib`如何以面向对象、跨平台、易读性强的优势成为现代开发首选。涵盖路径构建、解析、文件操作、目录遍历等场景,结合实例说明其简洁性与实用性,并提供迁移策略与常见问题解答,助力开发者高效掌握现代化路径操作。
370 1

热门文章

最新文章