前言
什么是发布订阅模式呐? 基于一个事件(主题)通道,希望接收通知的对象 Subscriber
通过自定义事件订阅主题,被激活事件的对象 Publisher
通过发布主题事件的方式通知各个订阅该主题的 Subscriber
对象。
举个通俗的栗子——追剧。某平台上线了一部小包特别喜欢的电视剧,每时每刻都想看到最新进度,但打工人小包还是非常繁忙的,总不能每时每刻刷新平台吧。平台发现了这个问题,提供了订阅功能,小包选择订阅该电视剧,更新后,平台便会第一时间发送消息通知小包。小包便可以愉快的追剧了。
上述案例中,电视剧就是发布者 Publisher
,小包就是订阅者 Subscriber
,平台则承担了事件通道(Event Channel
) 中介作用。
前几个月,小包写了一篇 观察者模式 vs 发布订阅模式,千万不要再混淆了 ,通过武侠的角度讲解了观察者模式与发布订阅模式的区别,衍化的方式有可能增加了某些方面的理解成本,文章也引起了部分争议,小包感觉当初的发布订阅模式代码实现也并不完美。
恰巧小包最近在学习 nodejs
,nodejs
提供了 event.EventEmitter
模块,该模块的核心就是事件触发与事件监听器功能的封装。基于 EventEmitter
模块可以比较便捷的实现发布订阅模式,因此小包决定吸收 EventEmitter
的源码精髓,完善发布订阅模式。
学习本文,你能收获:
- 🌟 掌握发布订阅模式
- 🌟 了解
Node
中EventEmitter
的实现及使用 - 🌟 掌握手写发布订阅模式
EventEmitter
首先小包就带大家阅读一下 EventEmitter
的源码,源码内容非常多,小包本文只讲解有关于发布订阅部分的代码。
init 方法
发布订阅模式中有三大对象,事件(主题)通道负责维护某一事件下的处理函数队列。因此我们首先需要维护一个事件通道,将其定义在构造函数中。
// 事件通道的存储格式 const EventChannel = { event1: [func1, func2], event2: [func3, func4], }; 复制代码
EventEmitter
使用 EventEmitter.init
方法初始化事件通道属性,可以发现 init
方法中并没有直接将 _events
初始化为 {}
,而是初始化为 ObjectCreate(null) —— Object.create
。
那为什么会这样实现呐?Object.create(null)
创建的空对象没有原型方法,是纯粹的对象,可以避免原型的污染。而对象字面量 {}
创造的空对象和 new Object()
方式是相同的,可以继承 Object
对象的属性。
function EventEmitter(opts) { EventEmitter.init.call(this, opts); } EventEmitter.init = function (opts) { if ( this._events === undefined || this._events === ObjectGetPrototypeOf(this)._events ) { this._events = ObjectCreate(null); this._eventsCount = 0; } }; 复制代码
addListener/on
addListener/on
方法是为指定事件注册一个监听器,接受一个字符串event
和一个回调函数。
很有意思的是,EventEmitter
提供了两对实现订阅和取消订阅的方法:
addListener/on
和 removeListener/off
。在学习该模块时,小包还特地纠结了一下,但是阅读到源码时,一切豁然开朗:这两对方法本质都是相同的。
EventEmitter.prototype.on = EventEmitter.prototype.addListener; EventEmitter.prototype.off = EventEmitter.prototype.removeListener; 复制代码
on
方法在内部是基于 _addListener
方法,因此小包主要解读 _addListener
方法,下面先来铺垫一下源码中出现的 newListener
及 prepend
属性。
知识 1:newListener 事件
newListener
是 nodejs
中人为规定的事件,该事件在添加新监视器时被触发。使用方式与普通绑定监视器相同,只不过监视名强制设定为 newListener
。
var events = require("events"); var eventEmitter = new events.EventEmitter(); eventEmitter.on("newListener", () => { console.log("绑定了新事件"); }); eventEmitter.on("click", () => { console.log("click"); }); // 输出结果:绑定了新事件 复制代码
知识 2:prepend 属性
prepend
中文含义是预置或者前置,该属性控制的是同一事件不同处理函数的顺序问题。我们来举个栗子:(该属性并没有暴露给外部使用)
// prepend 为 false event.on("click", fn1); event.on("click", fn2); event.on("click", fn3); // 那么此时事件通道中 click 事件的三个处理函数应该是自上往下的 { click: [fn1, fn2, fn3]; } 复制代码
// prepend 为 true // 这里只是为了举栗子 event.on("click", fn1, true); event.on("click", fn2, true); event.on("click", fn3, true); // 那么此时事件通道中 click 事件的三个处理函数应该是自上往下的 { click: [fn3, fn2, fn1]; } 复制代码
下面来解读源码:
Step1: 获取事件通道及待注册事件的监听器
events = target._events; // 判断事件通道是否存在 if (events === undefined) { events = target._events = ObjectCreate(null); } else { // 如果已经注册了 newListener 事件,后续注册事件前都会触发 newListener 事件 if (events.newListener !== undefined) { target.emit( "newListener", type, // 这里等到 once 部分做详解 listener.listener ? listener.listener : listener ); events = target._events; } // 获取该事件的监听器 existing = events[type]; } 复制代码
Step2: 给该事件添加新的监听器
// 此前未有该事件的订阅出现 if (existing === undefined) { // 源码认为如果只有一个处理函数,没有必要声明数组 events[type] = listener; } else { if (typeof existing === "function") { // 将新处理函数压入到数组中 // prepend 决定压入顺序 existing = events[type] = prepend ? [listener, existing] : [existing, listener]; } else if (prepend) { existing.unshift(listener); } else { existing.push(listener); } } 复制代码
removeListener/off
removeListener/off
是移除指定事件的某个监听器,监听器必须是该事件已经注册过的监听器。
与 newListener
事件相对应,nodejs
也设置了 removeListener
事件,当删除监听器时触发该事件。
移除监听器的代码比较简单,我们直接在源码上进行注释讲解。
EventEmitter.prototype.removeListener = function removeListener( type, listener ) { const events = this._events; // 没有事件通道 if (events === undefined) return this; const list = events[type]; // 该事件未注册处理函数 if (list === undefined) return this; // 当前事件只有一个监听器 // 这里处理了两种情况,on 注册监听器的删除及 once 注册监听器的删除,once 处会详细讲到 if (list === listener || list.listener === listener) { delete events[type]; // 触发 removeListener 事件 if (events.removeListener) this.emit("removeListener", type, list.listener || listener); // 从数组中删除监听器 } else if (typeof list !== "function") { for (let i = list.length - 1; i >= 0; i--) { if (list[i] === listener || list[i].listener === listener) { position = i; break; } } if (position < 0) return this; if (position === 0) list.shift(); else { if (spliceOne === undefined) spliceOne = require("internal/util").spliceOne; spliceOne(list, position); } // 如果只有一个监听器,无需使用数组存储 if (list.length === 1) events[type] = list[0]; if (events.removeListener !== undefined) this.emit("removeListener", type, listener); } return this; }; 复制代码
once
once
为指定事件注册一个单次监听器,即监听器最多只会触发一次,触发后立刻解除该监听器。
once
处有个坑,我们需要注意,once
执行一次后会解除监听器,但我们同样可以在 once
的事件执行前解除此监听器,因此 once
处我们要处理两种情况。
Case1:监听器执行完毕后解除
once
方法与 on
方法的区别在于 once
只执行监听器一次然后移除,因此我们设计 once
时候可以借用 on
方法,传入一个包含监听器方法及移除该监听器的包裹函数 wrapFn
。
eventEmitter.on(event, (...args) => { listener(...args); eventEmitter.off(event, listener); }); 复制代码
Case2:调用 removeListener/off 方法解除监听器
如果直接调用 removeListener/off
移除监听器,则与 on
方法添加的监听器移除是类似的,但 Case1
中,我们监听的是当前监听器与移除监听器的包裹函数 wrapFn
,调用 removeListener/off
移除方法时,我们传入的是 listener
监听器方法,所以无法删除成功。
因此我们为了适应这种情况,给包裹函数 wrapFn
身上挂载一个标识,标识值是监听器(wrapFn.listener = listener)。因此我们在调用移除方法时,同时判断 listener
与 listener.listener
即可。
once 方法源码:
EventEmitter.prototype.once = function once(type, listener) { checkListener(listener); // 调用了 _onceWrap 方法,这里实现了上面的包裹功能 this.on(type, _onceWrap(this, type, listener)); return this; }; 复制代码
function _onceWrap(target, type, listener) { const state = { fired: false, wrapFn: undefined, target, type, listener }; const wrapped = onceWrapper.bind(state); // Case2: 调用 off 方法移除监听器,在包裹函数上挂载listener wrapped.listener = listener; state.wrapFn = wrapped; return wrapped; } 复制代码
function onceWrapper() { if (!this.fired) { // Case1 监听器执行与监听器移除 this.target.removeListener(this.type, this.wrapFn); this.fired = true; if (arguments.length === 0) return this.listener.call(this.target); return this.listener.apply(this.target, arguments); } } 复制代码
emit 方法
emit
方法按监听器的顺序执行执行每个监听器,如果事件有注册监听返回true
,否则返回false
。
emit
方法实现起来比较简单,获取对应事件的监听器,传入参数执行即可。
EventEmitter.prototype.emit = function emit(type, ...args) { const events = this._events; if (events !== undefined) { // 获取监听器,监听器有三种情况: 1.没有(返回false) 2.只有一个(函数形式) 3.多个(数组形式) const handler = events[type]; // Case1 没有值 if (handler === undefined) return false; // Case2 函数形式 if (typeof handler === "function") { const result = ReflectApply(handler, this, args); } else { // Case3 数组形式 const len = handler.length; const listeners = arrayClone(handler); for (let i = 0; i < len; ++i) { const result = ReflectApply(listeners[i], this, args); } } return true; }; 复制代码
源码收获
上面小包带领大家阅读了 Nodejs EventEmitter
模块的部分源码,我们能从中学到那些东西来完善我们的发布订阅模式呐?
- 初始值使用
Object.create(null)
可以避免原型污染 - 事件只存在一个监听器时,无需使用数组
once
方法的两种情况处理off
方法边界情况的处理及两种删除情况的处理
发布订阅实现
有了阅读源码的基础,我们就可以来实现完善的发布订阅模式。
EventEmitter 构造函数
function EventEmitter() { this._events = Object.create(null); } 复制代码
on 方法
EventEmitter.prototype.on = function (type, listener) { // 获取事件通道 let events = this._events; if (events === undefined) { events = this._events = Object.create(null); } // 判断是否监听了 newListener 事件,如果监听则执行 newListener 的回调函数 if (type !== "newListener") { if (events.newListener) { ethis.emit("newListener", type); } } // 对于单个监听器是否使用数组小包认为影响不大,因此小包继续使用数组 if (!events[type]) { events[type] = [listener]; } else { events[type].push(listener); } }; 复制代码
off 方法
off
方法我们要处理好 on
注册监听器的移除及 once
注册监听器的移除,同时做好边界情况处理。
EventEmitter.prototype.off = function (type, listener) { const events = this._events; // 边界情况 if (events === undefined) { return this; } const listenerList = events[type]; if (listenerList === undefined) { return this; } // 处理两种情况 events[type] = events[type].filter((fn) => { return fn !== listener && fn.listener !== listener; }); }; 复制代码
once 方法
源码处我们讲过,once
要处理两种情况。
EventEmitter.prototype.once = function (type, listener) { // 监听器执行后移除 const onceApply = (...args) => { listener.call(this, ...args); this.off(type, listener); }; // 绑定标识,标识为 listener onceApply.listener = listener; // 注册监听器 this.on(type, onceApply); }; 复制代码
emit 方法
EventEmitter.prototype.emit = function (type, ...args) { const events = this._events[type]; // 边界情况处理 if (events === undefined) { return false; } const handler = events[type]; if (handler === undefined) { return false; } // 执行 emit 事件对应的监听器 handler.forEach((fn) => { fn.call(this, ...args); }); return true; };