IoService是对通信双方所进行的I/O操作的抽象,那么无论是在服务器端还是在客户端,都要进行I/O的读写操作,它们有一些共性,可以抽象出来。这里,我们主要详细说明IoAccectpr和IoConnector以及所基于的IoService抽象服务,都提供哪些操作和数据结构,都是如何构建的。首先,提供一个IoService服务接口相关的继承层次关系的类图,如图所示:
最终使用的Acceptor和Connector是上面继承层次中最下层的实现类。
IoService抽象
实际上,支持I/O操作服务的内容,集中在两个类中:IoService和AbstractIoService,看一下类图:
根据上图中IoService接口定义,我们给出接口中定义的方法,如下所示:
01 |
public interface IoService { |
02 |
void addListener(IoServiceListener listener); |
03 |
void removeListener(IoServiceListener listener); |
04 |
boolean isDisposing(); |
07 |
void dispose( boolean awaitTermination); |
08 |
IoHandler getHandler(); |
09 |
void setHandler(IoHandler handler); |
10 |
Map<Long, IoSession> getManagedSessions(); |
11 |
int getManagedSessionCount(); |
12 |
IoSessionConfig getSessionConfig(); |
13 |
IoFilterChainBuilder getFilterChainBuilder(); |
14 |
void setFilterChainBuilder(IoFilterChainBuilder builder); |
15 |
DefaultIoFilterChainBuilder getFilterChain(); |
17 |
long getActivationTime(); |
18 |
Set<WriteFuture> broadcast(Object message); |
19 |
IoSessionDataStructureFactory getSessionDataStructureFactory(); |
20 |
void setSessionDataStructureFactory(IoSessionDataStructureFactory sessionDataStructureFactory); |
21 |
int getScheduledWriteBytes(); |
22 |
int getScheduledWriteMessages(); |
23 |
IoServiceStatistics getStatistics(); |
我们可以看到,IoService主要定义了两类服务,一类是提供I/O操作相关服务,另一类是会话 (IoSession)相关服务,这两类服务,无论是在服务端还是在客户端,都会提供,以此来保证双方通信。那么,具体地这两类服务中都包括哪些内容,我们总结如下:
- 管理IoService元数据,描述IoService本身,这些元数据都封装在TransportMetadata中,例如I/O 服务类型(如NIO,APR或RXTX),连接类型(如无连接接),地址类型等。
- 管理IoServiceListener,它是用来监听与一个IoService服务相关的事件的,比如服务的激活、会话的建立等 等,当然,这些监听服务不是提供给外部进行开发使用的,而是Mina内部使用的。
- 管理IoHandler,从Mina框架的架构我们知道,IoHandler的具体实现是与业务逻辑处理相关的,也是最靠近应用层的。
- 管理IoSession,即管理与一个IoService服务交互的会话对象,可以有一组会话同时使用该IoService服务。
- 管理IoFilter链,IoFilter链基于事件拦截模式,它位于IoHandler与IoService两层之间,Mina为 了方便使用IoFilter链,直接内置了一个IoFilterChainBuilder(具体实现为 DefaultIoFilterChainBuilder)。
- 管理一些相关的统计信息,如读写字节数、读写消息数、读写时间等。
上面类图中,AbstractIoService实现了IoService接口中定义的操作,同时增加了一些属性字段,可以通过这些字段看出,Mina框架IoService抽象服务层设计了哪些数据结构,用来辅助有关I/O操作的服务。我们通过如下几个方面来详述:
管理服务于IoService的IoServiceListener,主要是通过IoServiceListenerSupport类,这 个类中定义了如下结构:
1 |
private final List<IoServiceListener> listeners = new CopyOnWriteArrayList<IoServiceListener>(); |
2 |
private final ConcurrentMap<Long, IoSession> managedSessions = new ConcurrentHashMap<Long, IoSession>(); |
3 |
private final Map<Long, IoSession> readOnlyManagedSessions = Collections.unmodifiableMap(managedSessions); |
当我们创建一个IoService实例时,可能是服务器端的IoAccectpr,也可能是客户端的IoConnector,可以分别通过调用如下两个方法来增加或者移除一个IoServiceListener:
1 |
void addListener(IoServiceListener listener); |
2 |
void removeListener(IoServiceListener listener); |
一个IoServiceListener定义如下操作:
1 |
public interface IoServiceListener extends EventListener { |
2 |
void serviceActivated(IoService service) throws Exception; |
3 |
void serviceIdle(IoService service, IdleStatus idleStatus) throws Exception; |
4 |
void serviceDeactivated(IoService service) throws Exception; |
5 |
void sessionCreated(IoSession session) throws Exception; |
6 |
void sessionDestroyed(IoSession session) throws Exception; |
通过接口中定义的方法名,可以了解到,一个IoService监听器都负责监听哪些事件。
就像上面IoServiceListener与IoServiceListenerSupport的关系一样,IoFilter是通过另一 个工具类IoFilterChainBuilder来聚合起来,形成一个IoFilter链。通过实现IoFilterChainBuilder 接口的DefaultIoFilterChainBuilder可以对一组IoFilter进行创建。包含的数据结构如下所示:
1 |
private final List<Entry> entries; |
3 |
public DefaultIoFilterChainBuilder() { |
4 |
entries = new CopyOnWriteArrayList<Entry>(); |
其中Entry包装了一个IoFilter以及为其定义的名称。从IoFilterChainBuilder的名称来看,它只是关注一个 IoFilterChain如何创建,而不关心一组注册的IoFilter调用顺序,也不关心被指定事件被触发时调用哪个操作,这些逻辑是由 IoFilterChain来定义,并通过实现这个接口的DefaultIoFilterChain类实现的。当我们调用DefaultIoFilterChainBuilder 实例的有关操作IoFilter的方法,如下所示(在DefaultIoFilterChainBuilder中实现):
1 |
public synchronized void addFirst(String name, IoFilter filter); |
2 |
public synchronized void addLast(String name, IoFilter filter); |
3 |
public synchronized void addBefore(String baseName, String name, IoFilter filter); |
4 |
public synchronized void addAfter(String baseName, String name, IoFilter filter); |
实际上最终在调用构建的方法buildFilterChain的时候,将已经组织到DefaultIoFilterChainBuilder 实例中的多个IoFilter实例添加到已经构造的IoFilterChain中(如默认的DefaultIoFilterChain),一 个IoFilterChain实例可以在IoService实例运行时被使用,下面是buildFilterChain方法的逻辑:
1 |
public void buildFilterChain(IoFilterChain chain) throws Exception { |
2 |
for (Entry e : entries) { |
3 |
chain.addLast(e.getName(), e.getFilter()); |
也就是说,IoFilterChainBuilder是供使用Mina框架的开发网络应用程序的人员组织IoFilter链的,它只是一个运行前构建工具;而IoFilterChain是Mina框架运行服务所需要的,即是一个运行时辅助管理IoFilter链调用的工具。
每当有一个新的会话被创建,及使用了IoService提供的服务,就对应创建了一个IoSession实例,而且,与IoSession 相关的一些实时数据需要在内存中保存,以便IoService实例能够随时访问并对该会话实例提供需要的I/O读写服务。Mina定义了 IoSessionDataStructureFactory,来保存会话相关数据,这个结构提供了如下两个方法:
1 |
public interface IoSessionDataStructureFactory { |
2 |
IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception; |
3 |
WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception; |
可以看出,上面方法中的IoSessionAttributeMap和WriteRequestQueue都是与一个IoSession相 关的数据对象,我们可以看一下,这几个类之间的关系,如图所示:
与一个IoSession有关的数据,都在上面的结构中保存着。其中主要包含两类:一类是用户在启动会话时定义的属性集合,另一类是会话期 间可能需要进行读写操作。每个IoSession实例调用write方法的时候,都会对应这一个WriteRequest对象,封装了写请求数据。而提供I/O服务的IoService实例在运行时会把对应的WriteRequest对象放入/移出IoSessionDataStructureFactory 结构所持有的队列。
每个IoService都对应这一个Executor,用来处理被触发的I/O事件。
IoAcceptor与IoConnector抽象
IoAcceptor和IoConnector已经区分I/O操作相关的不同服务了,作为通信的服务器端和客户端,必然存在一些差异服务来维持各自在通信过程中的角色,比如,IoAcceptor需要监听指定服务端口,等待客户端的连接到服务器端,而IoConnector与服务器端进行通信,首先应该连接到服务器端Socket暴露的服务地址。下面,我们分别根据通信双方的这两种不同角色,来深入讨论一些细节。
从IoAcceptor接口定义,可以很好地看出它具有的一些基本操作,如下所示:
01 |
public interface IoAcceptor extends IoService { |
02 |
SocketAddress getLocalAddress(); |
03 |
Set<SocketAddress> getLocalAddresses(); |
04 |
SocketAddress getDefaultLocalAddress(); |
05 |
List<SocketAddress> getDefaultLocalAddresses(); |
06 |
void setDefaultLocalAddress(SocketAddress localAddress); |
07 |
void setDefaultLocalAddresses(SocketAddress firstLocalAddress, SocketAddress... otherLocalAddresses); |
08 |
void setDefaultLocalAddresses(Iterable<? extends SocketAddress> localAddresses); |
09 |
void setDefaultLocalAddresses(List<? extends SocketAddress> localAddresses); |
10 |
boolean isCloseOnDeactivation(); |
11 |
void setCloseOnDeactivation( boolean closeOnDeactivation); |
12 |
void bind() throws IOException; |
13 |
void bind(SocketAddress localAddress) throws IOException; |
14 |
void bind(SocketAddress firstLocalAddress, SocketAddress... addresses) throws IOException; |
15 |
void bind(SocketAddress... addresses) throws IOException; |
16 |
void bind(Iterable<? extends SocketAddress> localAddresses) throws IOException; |
18 |
void unbind(SocketAddress localAddress); |
19 |
void unbind(SocketAddress firstLocalAddress, SocketAddress... otherLocalAddresses); |
20 |
void unbind(Iterable<? extends SocketAddress> localAddresses); |
21 |
IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress); |
可以看到上面定义的方法中,主要是与IP地址相关的操作,主要包括绑定和解绑定,这些操作的实现是在该接口的抽象实现类AbstractIoAcceptor中给予实现的,在AbstractIoAcceptor中并没有涉及到有关SocketChannel的I/O操作,有关如何基于轮询的策略去检查SocketChannel是否有相应的事件被触发,这些I/O相关的操作被封装到AbstractPollingIoAcceptor类中。以基于TCP的NIO通信为例,具体接收客户端到来的连接请求,这些逻辑是在AbstractPollingIoAcceptor的实现类NioSocketAcceptor中实现的,这里创建了用来管理与客户端通信的NioSocketSession对象(它是IoSession的NIO实现)。
IoConnector的接口定义,如下所示:
01 |
public interface IoConnector extends IoService { |
02 |
int getConnectTimeout(); |
03 |
long getConnectTimeoutMillis(); |
04 |
void setConnectTimeout( int connectTimeout); |
05 |
void setConnectTimeoutMillis( long connectTimeoutInMillis); |
06 |
SocketAddress getDefaultRemoteAddress(); |
07 |
void setDefaultRemoteAddress(SocketAddress defaultRemoteAddress); |
08 |
ConnectFuture connect(); |
09 |
ConnectFuture connect(IoSessionInitializer<? extends ConnectFuture> sessionInitializer); |
10 |
ConnectFuture connect(SocketAddress remoteAddress); |
11 |
ConnectFuture connect(SocketAddress remoteAddress, IoSessionInitializer<? extends ConnectFuture> sessionInitializer); |
12 |
ConnectFuture connect(SocketAddress remoteAddress, SocketAddress localAddress); |
13 |
ConnectFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, IoSessionInitializer<? extends ConnectFuture> sessionInitializer); |
IoConnector定义的操作基本是与连接到服务端的。同样,AbstractIoConnector实现了Connector接口定义的基本操作。以基于TCP的NIO通信为例,客户端和服务端有部分操作非常类似,如轮询SocketChannel检查是否有事件触发,读写请求等,所以,客户端在AbstractIoConnector的抽象实现类AbstractPollingIoConnector中处理于此相关的逻辑。与NioSocketAcceptor对应,客户端有一个NioSocketConnector实现类。
通过上面IoAcceptor和IoConnector的说明,我们还不知道具体I/O操作是由谁来处理的。实际上,无论是服务端还是客户端,在处理轮询通道的抽象服务中,封装了一个IoProcessor抽象,它才是实际处理I/O操作的抽象部分。为了将通信的宏观抽象过程与通信过程中的处理细节分开,将IoProcessor独立出来,与宏观通信过程的逻辑解耦合。以基于TCP的NIO通信为例,在AbstractPollingIoAcceptor和AbstractPollingIoConnector中都有一个IoProcessor实例(这里是实现类NioProcessor的实例),通过调用它提供的处理操作来完成实际的I/O操作。