一文打尽python-web开发的signal机制

简介: signal在flask/django中都是很重要的解耦手段。flask的signal依赖blinker实现,django的signal也很类似。blinker库是纯python实现的代码简单,功能强大的signal库。本文我们从blinker开始,一起了解python-web开发的signal机制。

signal在flask/django中都是很重要的解耦手段。flask的signal依赖blinker实现,django的signal也很类似。blinker库是纯python实现的代码简单,功能强大的signal库。本文我们从blinker开始,一起了解python-web开发的signal机制:


  • blinker的api
  • blinker-signal的实现
  • flask-signal的实现
  • django-signal的实现
  • weakref介绍
  • 小结
  • 小技巧


blinker简介



blinker源码使用 1.4 版本, 项目结构如下:


文件 描述
base.py 核心逻辑
_saferef.py 安全引用相关逻辑
_utilities.py 工具类


blinker的API



blinker的api使用示例:


from blinker import signal
def subscriber1(sender):
    print("1 Got a signal sent by %r" % sender)
def subscriber2(sender):
    print("2 Got a signal sent by %r" % sender)
ready = signal('ready')
print(ready)
ready.connect(subscriber1)
ready.connect(subscriber2)
ready.send("go")


示例的日志输出:


<blinker.base.NamedSignal object at 0x7f93a805ad00; 'ready'>
1 Got a signal sent by 'go'
2 Got a signal sent by 'go'


可以看到signal是发布/订阅模式。或者换个更常见的说法,事件中心:


  • ready = signal('ready') 创建名为ready的事件中心
  • ready.connect(subscriber1) 给ready事件中心添加事件监听器
  • ready.send("go") 向ready事件中心派发事件,这样事件监听器会收到事件并进行处理


signal的实现



signal默认单例,提供开箱即用的API:


class NamedSignal(Signal):
    """A named generic notification emitter."""
    def __init__(self, name, doc=None):
        Signal.__init__(self, doc)
        self.name = name
class Namespace(dict):
    def signal(self, name, doc=None):
        try:
            return self[name]
        except KeyError:
            return self.setdefault(name, NamedSignal(name, doc))
signal = Namespace().signal


需要说明一下的是,signal的单例是和name绑定的。同一个名称得到同一个NamedSignal对象,不同名称得到的NamedSignal对象不一样。


NamedSignal的父类Signal的构造方法,包括1)事件接收器字典receivers:以事件接收器id为key和事件接收器为value;2)接收器ID-发送器ID的字典:以接收器ID为key和发送器ID集合为value;3)和2类似的字典,只不过是反向的,key为发送器ID,value为接收器集合。


ANY = symbol('ANY')
class Signal(object):
    ANY = ANY
    def __init__(self, doc=None)
        self.receivers = {}
        self._by_receiver = defaultdict(set)
        self._by_sender = defaultdict(set)
        ...


Signal的connect函数添加消息接收器,可以看到sender和receiver是多对多的关系。


def connect(self, receiver, sender=ANY, weak=True):
    receiver_id = hashable_identity(receiver)
    receiver_ref = receiver
    sender_id = ANY_ID
    self.receivers.setdefault(receiver_id, receiver_ref)
    self._by_sender[sender_id].add(receiver_id)
    self._by_receiver[receiver_id].add(sender_id)
    del receiver_ref
    return receiver


Signal的send函数将消息发送给所有关注该sender的接收器:


def send(self, *sender, **kwargs):
    sender = sender[0]
    # 循环执行所有的receiver
    return [(receiver, receiver(sender, **kwargs))
            for receiver in self.receivers_for(sender)]
def receivers_for(self, sender):
    sender_id = hashable_identity(sender)
    # 根据sender_id找receiver_id
    if sender_id in self._by_sender:
        # 2个set的合集 
        ids = (self._by_sender[ANY_ID] |
               self._by_sender[sender_id])
    else:
        ids = self._by_sender[ANY_ID].copy()
    for receiver_id in ids:
        receiver = self.receivers.get(receiver_id)
        if receiver is None:
            continue
        # 迭代器
        yield receiver


有始有终,Signal使用disconnect函数注销消息的接收器:


def disconnect(self, receiver, sender=ANY):
    sender_id = ANY_ID
    receiver_id = hashable_identity(receiver)
    self._disconnect(receiver_id, sender_id)
def _disconnect(self, receiver_id, sender_id):
    if sender_id == ANY_ID:
        if self._by_receiver.pop(receiver_id, False):
            for bucket in self._by_sender.values():
                bucket.discard(receiver_id)
        self.receivers.pop(receiver_id, None)
    else:
        self._by_sender[sender_id].discard(receiver_id)
        self._by_receiver[receiver_id].discard(sender_id)


为了便于理解signal机制,我们暂时忽略了weakref相关的代码,稍后再进行介绍。


flask-signal的实现



flask-signal依赖blinker的实现:


# flask.signals.py
from blinker import Namespace
_signals = Namespace()
template_rendered = _signals.signal("template-rendered")
before_render_template = _signals.signal("before-render-template")
request_started = _signals.signal("request-started")
request_finished = _signals.signal("request-finished")
request_tearing_down = _signals.signal("request-tearing-down")
got_request_exception = _signals.signal("got-request-exception")
appcontext_tearing_down = _signals.signal("appcontext-tearing-down")
appcontext_pushed = _signals.signal("appcontext-pushed")
appcontext_popped = _signals.signal("appcontext-popped")
message_flashed = _signals.signal("message-flashed")


从上面代码可以看到flask使用blinker预制了多个signal。以request_started为例, flask在处理request时候会向request_started派发事件:


# flask.app.py
from .signals import request_started
def full_dispatch_request(self):
    ...
    request_started.send(self)
    ...


我们可以在自己的代码中,这样注册事件监听:

def log_request(sender, **extra):
    sender.logger.debug('Request context is set up')
from flask import request_started
request_started.connect(log_request, app)


这样就可以很方便的使用signal获取到flask在各个阶段的数据。


django-signal的实现



django-signal虽然是独立实现,但是模式和blinker非常类似。Signal构造函数创建了一个对象,充当事件中心。


# django/dispatch/dispatcher.py
def _make_id(target):
    if hasattr(target, '__func__'):
        return (id(target.__self__), id(target.__func__))
    return id(target)
NONE_ID = _make_id(None)
# A marker for caching
NO_RECEIVERS = object()
class Signal:
    def __init__(self, providing_args=None, use_caching=False):
        """
        Create a new signal.
        """
        self.receivers = []
        self.lock = threading.Lock()
        self.use_caching = use_caching
        self.sender_receivers_cache = weakref.WeakKeyDictionary() if use_caching else {}
        self._dead_receivers = False


connect核心功能就是为事件监听器构建唯一标识(receiver_id,sender_id),然后加入receivers数组。


def connect(self, receiver, sender=None, weak=True, dispatch_uid=None):
    from django.conf import settings
    lookup_key = (_make_id(receiver), _make_id(sender))
    ref = weakref.ref
    receiver_object = receiver
    receiver = ref(receiver)
    with self.lock:
        if not any(r_key == lookup_key for r_key, _ in self.receivers):
            self.receivers.append((lookup_key, receiver))


send函数和blinker的send类似:


def send(self, sender, **named):
    return [
        (receiver, receiver(signal=self, sender=sender, **named))
        for receiver in self._live_receivers(sender)
    ]
def _live_receivers(self, sender):
    with self.lock:
        senderkey = _make_id(sender)
        receivers = []
        for (receiverkey, r_senderkey), receiver in self.receivers:
            if r_senderkey == NONE_ID or r_senderkey == senderkey:
                receivers.append(receiver)
    ...
    non_weak_receivers = []
    for receiver in receivers:
        non_weak_receivers.append(receiver)
    return non_weak_receivers


django-signal额外提供了一个receiver装饰器,方便业务使用:


def receiver(signal, **kwargs):
    def _decorator(func):
        if isinstance(signal, (list, tuple)):
            for s in signal:
                s.connect(func, **kwargs)
        else:
            signal.connect(func, **kwargs)
        return func
    return _decorator


django的model中额外包装了ModelSignal类并且预制了一些signal:



class ModelSignal(Signal):
    def _lazy_method(self, method, apps, receiver, sender, **kwargs):
        from django.db.models.options import Options
        # This partial takes a single optional argument named "sender".
        partial_method = partial(method, receiver, **kwargs)
        if isinstance(sender, str):
            apps = apps or Options.default_apps
            apps.lazy_model_operation(partial_method, make_model_tuple(sender))
        else:
            return partial_method(sender)
    def connect(self, receiver, sender=None, weak=True, dispatch_uid=None, apps=None):
        self._lazy_method(
            super().connect, apps, receiver, sender,
            weak=weak, dispatch_uid=dispatch_uid,
        )
...
# 定义模型各个阶段的signal
pre_init = ModelSignal(use_caching=True)
post_init = ModelSignal(use_caching=True)
pre_save = ModelSignal(use_caching=True)
post_save = ModelSignal(use_caching=True)
pre_delete = ModelSignal(use_caching=True)
post_delete = ModelSignal(use_caching=True)
m2m_changed = ModelSignal(use_caching=True)
pre_migrate = Signal()


signal的使用方式在receiver装饰器的注释中有介绍:


"""
A decorator for connecting receivers to signals. Used by passing in the
signal (or list of signals) and keyword arguments to connect::
    @receiver(post_save, sender=MyModel)
    def signal_receiver(sender, **kwargs):
        ...
    @receiver([post_save, post_delete], sender=MyModel)
    def signals_receiver(sender, **kwargs):
        ...
"""


这样利用signal机制,可以对MyModel进行一些额外的逻辑处理,又避免了代码的硬耦合。


weakref 介绍



了解了signal的各种实现和使用后,我们再回头学习blinker-signal中另外一个环节weakref。weakref可以显著提高signal的性能, 请看下面示例:


def test_weak_value_dict(cache):
    c_list = []
    class C:
        def method(self):
            return ("method called!", id(self))
    c1 = C()
    c2 = C()
    c3 = C()
    c_list.append(c1)
    c_list.append(c2)
    c_list.append(c3)
    del c1, c2, c3
    def do_cache(cache, name, target):
        cache[name] = target
    for idx, target in enumerate(c_list):
        do_cache(cache, idx, target)
    for k, v in cache.items():
        print("before", k, v.method())
    del c_list
    gc.collect()
    for x, y in cache.items():
        print("after", x, y.method())
test_weak_value_dict({})
print("==" * 10)
test_weak_value_dict(weakref.WeakValueDictionary())


在test_weak_value_dict函数中,创建了3个对象,将对象放到一个列表和cache中,完成后再删除对象和对象列表并进行gc。如果cache的实现是set,那么gc后cache中任然存在3个对象,也就是对象不会回收;如果是使用WeakValueDictionary实现的cache,则部分对象进行了回收。在一个事件中心,如果监听函数取消后却无法释放回收,内存会持续增长。


before 0 ('method called!', 140431874960640)
before 1 ('method called!', 140431874959440)
before 2 ('method called!', 140431874959968)
after 0 ('method called!', 140431874960640)
after 1 ('method called!', 140431874959440)
after 2 ('method called!', 140431874959968)
====================
before 0 ('method called!', 140431875860416)
before 1 ('method called!', 140431875860128)
before 2 ('method called!', 140431876163136)
after 2 ('method called!', 140431876163136)


为什么WeakValueDictionary还保留最后一个数据呢? 欢迎大家评论区交流


signal 小结



到这里我们可以知道blinker/flask/django的signal都是单纯的python消息中心,和我们之前在gunicorn中使用的系统 signal 完全不一样。消息中心,可以用来进行业务逻辑的解耦,一般就包括三步:


  • 注册监听器
  • 派发事件
  • 注销监听器


小技巧



blinker中提供了一种 单例模式 的实现参考,我把它叫做 分组单例 , 组名相同会得到同一个对象实例:


class _symbol(object):
    def __init__(self, group):
        """Construct a new group symbol."""
        # 原文是name,我把它换成了group,感觉这样更容易理解一些
        self.__group__ = self.group = group
    def __reduce__(self):
        return symbol, (self.group,)
    def __repr__(self):
        return self.group
_symbol.__group__ = 'symbol'
class symbol(object):
    """A constant symbol.
    # group相同的symbol是同一个对象
    >>> symbol('foo') is symbol('foo')
    True
    >>> symbol('foo')
    foo
    """
    symbols = {}
    def __new__(cls, group):
        try:
            return cls.symbols[group]
        except KeyError:
            return cls.symbols.setdefault(group, _symbol(group))
ANY = symbol('ANY')  # 单例


参考链接:




目录
相关文章
|
2月前
|
算法 Java Go
【GoGin】(1)上手Go Gin 基于Go语言开发的Web框架,本文介绍了各种路由的配置信息;包含各场景下请求参数的基本传入接收
gin 框架中采用的路优酷是基于httprouter做的是一个高性能的 HTTP 请求路由器,适用于 Go 语言。它的设计目标是提供高效的路由匹配和低内存占用,特别适合需要高性能和简单路由的应用场景。
272 4
|
4月前
|
存储 监控 算法
淘宝买家秀 API开发实录Python(2025)
本文讲述了作者在电商开发领域,尤其是对接淘宝买家秀 API 接口过程中所经历的挑战与收获。从申请接入、签名验证、频率限制到数据处理和实时监控,作者分享了多个实战经验与代码示例,帮助开发者更高效地获取和处理买家秀数据,提升开发效率。
|
6月前
|
缓存 JavaScript 前端开发
鸿蒙5开发宝藏案例分享---Web开发优化案例分享
本文深入解读鸿蒙官方文档中的 `ArkWeb` 性能优化技巧,从预启动进程到预渲染,涵盖预下载、预连接、预取POST等八大优化策略。通过代码示例详解如何提升Web页面加载速度,助你打造流畅的HarmonyOS应用体验。内容实用,按需选用,让H5页面快到飞起!
|
6月前
|
JavaScript 前端开发 API
鸿蒙5开发宝藏案例分享---Web加载时延优化解析
本文深入解析了鸿蒙开发中Web加载完成时延的优化技巧,结合官方案例与实际代码,助你提升性能。核心内容包括:使用DevEco Profiler和DevTools定位瓶颈、四大优化方向(资源合并、接口预取、图片懒加载、任务拆解)及高频手段总结。同时提供性能优化黄金准则,如首屏资源控制在300KB内、关键接口响应≤200ms等,帮助开发者实现丝般流畅体验。
|
前端开发 JavaScript Shell
鸿蒙5开发宝藏案例分享---Web页面内点击响应时延分析
本文为鸿蒙开发者整理了Web性能优化的实战案例解析,结合官方文档深度扩展。内容涵盖点击响应时延核心指标(≤100ms)、性能分析工具链(如DevTools时间线、ArkUI Trace抓取)以及高频优化场景,包括递归函数优化、网络请求阻塞解决方案和setTimeout滥用问题等。同时提供进阶技巧,如首帧加速、透明动画陷阱规避及Web组件初始化加速,并通过优化前后Trace对比展示成果。最后总结了快速定位问题的方法与开发建议,助力开发者提升Web应用性能。
|
3月前
|
设计模式 人工智能 API
AI智能体开发实战:17种核心架构模式详解与Python代码实现
本文系统解析17种智能体架构设计模式,涵盖多智能体协作、思维树、反思优化与工具调用等核心范式,结合LangChain与LangGraph实现代码工作流,并通过真实案例验证效果,助力构建高效AI系统。
510 7
|
6月前
|
JSON 开发框架 自然语言处理
【HarmonyOS Next之旅】基于ArkTS开发(三) -> 兼容JS的类Web开发(三)
本文主要介绍了应用开发中的三大核心内容:生命周期管理、资源限定与访问以及多语言支持。在生命周期部分,详细说明了应用和页面的生命周期函数及其触发时机,帮助开发者更好地掌控应用状态变化。资源限定与访问章节,则聚焦于资源限定词的定义、命名规则及匹配逻辑,并阐述了如何通过 `$r` 引用 JS 模块内的资源。最后,多语言支持部分讲解了如何通过 JSON 文件定义多语言资源,使用 `$t` 和 `$tc` 方法实现简单格式化与单复数格式化,为全球化应用提供便利。
266 104
|
6月前
|
JavaScript 前端开发 API
【HarmonyOS Next之旅】基于ArkTS开发(三) -> 兼容JS的类Web开发(二)
本文介绍了HarmonyOS应用开发中的HML、CSS和JS语法。HML作为标记语言,支持数据绑定、事件处理、列表渲染等功能;CSS用于样式定义,涵盖尺寸单位、样式导入、选择器及伪类等特性;JS实现业务逻辑,包括ES6语法支持、对象属性、数据方法及事件处理。通过具体代码示例,详细解析了页面构建与交互的实现方式,为开发者提供全面的技术指导。
287 104
|
4月前
|
算法 程序员 API
电商程序猿开发实录:淘宝商品python(2)
本文分享了开发者在对接淘宝商品详情API过程中的真实经历,涵盖权限申请、签名验证、限流控制、数据解析及消息订阅等关键环节,提供了实用的Python代码示例,帮助开发者高效调用API,提升系统稳定性与数据处理能力。
|
5月前
|
数据采集 存储 数据库
Python爬虫开发:Cookie池与定期清除的代码实现
Python爬虫开发:Cookie池与定期清除的代码实现

推荐镜像

更多