我们公司技术部门情况比较复杂,分到多个集团,每个集团又可能分为几个部门,每个部门又可能分为多个小组,组织架构比较复杂,开发人员比较多。
使用的编程语言也有点复杂,主流语言有.net(C#)、Java、PHP等。
所以SOA架构需要的是异构SOA。
有的同学可能说这个简单吗?“把部门合并扁平化合并为一个团队,把语言统一一种,要么.net要么Java。”
其实这样的简单粗暴并不能很好的解决问题的
首先公司组织架构就是不能随便修改的,一个公司的组织架构就是服务于这个公司的经营理念和营销模式,技术部门是服务机构并不直接产生价值,技术部门架构和公司组织架构高度一致能带来业务的高效性。
其次多语言技术体系也有其可取性
某个项目哪种语言能做的更快更好就用哪种语言
哪种语言的程序员好招,就多招一些,能在各种技术方向的变化中立于不败之地
现在继续说SOA,说起公司对SOA选型对于.net程序员开始还是一件挺悲催的事情,因为公司选的是dubbo
dubbo是阿里巴巴公司开源的一个高性能优秀的服务框架,说它是个伟大的开源项目并不为过,在很多互联网公司都有运用。
但是,dubbo是个Java项目,.net程序员就悲催了
为了更好的支持多语言的异构系统现状,具体选型是dubbox+ZooKeeper+Thrift,其中Thrift是facebook开发的高效RPC,支持语言非常多, C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml等。
有了Thrift,.net程序员的“春天”是不是就来了呢?
还是挺悲催,Java程序员几乎不用写额外代码配置一下就可以调用SOA服务或者发布服务,.net程序员要自己维护和ZooKeeper的通信和Thrift通信及日志统计和报送。
.net程序员苦不堪言,有些人质疑SOA选型(对.net程序员不公平),有些人"喊"着要.net程序员使用其他架构单干 ......
后来机缘巧合,.net的SOA这个事情就落在我的身上
领导把这个任务交给我的时候,我轻松的说没问题,但是时间证明这个事情比我原来想象的复杂得多,我也走了一些弯路,有过一些不太现实的想法,最终还是有了一个比较满意的结果
一、先说ZooKeeper
1、ZooKeeper是开源项目,其原理和作用这里不说,自行度娘
2、ZooKeeper的.net客户端使用nuget就可以安装使用
ZooKeeper客户端库也有很多开源项目支持,我这里选的是Apache基金会的官方版本
3、本地启动一个ZooKeeper来测试
ZooKeeper服务是Java开发的,ZooKeeper是个非常优秀的中间件,使用.net和Java调用区别并不大
这里查看ZooKeeper的工具也是Java开发的ZooInspector,正式环境我们有专门的后台来管理,本地调试ZooInspector就够用了。
二、再说Thrift
1、Thrift也是开源项目,其原理和作用这里不说,自行度娘
2、Thrift的.net库使用nuget就可以安装使用
Thrift客户端库也有很多开源项目支持,我这里还是选Apache基金会的官方版本
三、使用.net开发一个HelloWord服务
1、按Thrift的IDL规范定义接口Thrift文件
namespace java SOATest namespace csharp SOATest namespace php SOATest service HelloWorldService { string sayHello(1:string username) }
注:Thrift规范还是自行度娘
2、使用Thrift.exe生成代码
Thrift.exe使用方法可以使用Thrift的help指令查看,最好的方法还是自行度娘
3、到gen-csharp中找到刚生成的代码复制到项目中使用
/** * Autogenerated by Thrift Compiler (0.9.3) * * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING * @generated */ using System; using System.Collections; using System.Collections.Generic; using System.Text; using System.IO; using Thrift; using Thrift.Collections; using System.Runtime.Serialization; using Thrift.Protocol; using Thrift.Transport; namespace SOATest { public partial class HelloWorldService { public interface Iface { string sayHello(string username); #if SILVERLIGHT IAsyncResult Begin_sayHello(AsyncCallback callback, object state, string username); string End_sayHello(IAsyncResult asyncResult); #endif } public class Client : IDisposable, Iface { public Client(TProtocol prot) : this(prot, prot) { } public Client(TProtocol iprot, TProtocol oprot) { iprot_ = iprot; oprot_ = oprot; } protected TProtocol iprot_; protected TProtocol oprot_; protected int seqid_; public TProtocol InputProtocol { get { return iprot_; } } public TProtocol OutputProtocol { get { return oprot_; } } #region " IDisposable Support " private bool _IsDisposed; // IDisposable public void Dispose() { Dispose(true); } protected virtual void Dispose(bool disposing) { if (!_IsDisposed) { if (disposing) { if (iprot_ != null) { ((IDisposable)iprot_).Dispose(); } if (oprot_ != null) { ((IDisposable)oprot_).Dispose(); } } } _IsDisposed = true; } #endregion #if SILVERLIGHT public IAsyncResult Begin_sayHello(AsyncCallback callback, object state, string username) { return send_sayHello(callback, state, username); } public string End_sayHello(IAsyncResult asyncResult) { oprot_.Transport.EndFlush(asyncResult); return recv_sayHello(); } #endif public string sayHello(string username) { #if !SILVERLIGHT send_sayHello(username); return recv_sayHello(); #else var asyncResult = Begin_sayHello(null, null, username); return End_sayHello(asyncResult); #endif } #if SILVERLIGHT public IAsyncResult send_sayHello(AsyncCallback callback, object state, string username) #else public void send_sayHello(string username) #endif { oprot_.WriteMessageBegin(new TMessage("sayHello", TMessageType.Call, seqid_)); sayHello_args args = new sayHello_args(); args.Username = username; args.Write(oprot_); oprot_.WriteMessageEnd(); #if SILVERLIGHT return oprot_.Transport.BeginFlush(callback, state); #else oprot_.Transport.Flush(); #endif } public string recv_sayHello() { TMessage msg = iprot_.ReadMessageBegin(); if (msg.Type == TMessageType.Exception) { TApplicationException x = TApplicationException.Read(iprot_); iprot_.ReadMessageEnd(); throw x; } sayHello_result result = new sayHello_result(); result.Read(iprot_); iprot_.ReadMessageEnd(); if (result.__isset.success) { return result.Success; } throw new TApplicationException(TApplicationException.ExceptionType.MissingResult, "sayHello failed: unknown result"); } } public class Processor : TProcessor { public Processor(Iface iface) { iface_ = iface; processMap_["sayHello"] = sayHello_Process; } protected delegate void ProcessFunction(int seqid, TProtocol iprot, TProtocol oprot); private Iface iface_; protected Dictionary<string, ProcessFunction> processMap_ = new Dictionary<string, ProcessFunction>(); public bool Process(TProtocol iprot, TProtocol oprot) { try { TMessage msg = iprot.ReadMessageBegin(); ProcessFunction fn; processMap_.TryGetValue(msg.Name, out fn); if (fn == null) { TProtocolUtil.Skip(iprot, TType.Struct); iprot.ReadMessageEnd(); TApplicationException x = new TApplicationException (TApplicationException.ExceptionType.UnknownMethod, "Invalid method name: '" + msg.Name + "'"); oprot.WriteMessageBegin(new TMessage(msg.Name, TMessageType.Exception, msg.SeqID)); x.Write(oprot); oprot.WriteMessageEnd(); oprot.Transport.Flush(); return true; } fn(msg.SeqID, iprot, oprot); } catch (IOException) { return false; } return true; } public void sayHello_Process(int seqid, TProtocol iprot, TProtocol oprot) { sayHello_args args = new sayHello_args(); args.Read(iprot); iprot.ReadMessageEnd(); sayHello_result result = new sayHello_result(); result.Success = iface_.sayHello(args.Username); oprot.WriteMessageBegin(new TMessage("sayHello", TMessageType.Reply, seqid)); result.Write(oprot); oprot.WriteMessageEnd(); oprot.Transport.Flush(); } } #if !SILVERLIGHT [Serializable] #endif public partial class sayHello_args : TBase { private string _username; public string Username { get { return _username; } set { __isset.username = true; this._username = value; } } public Isset __isset; #if !SILVERLIGHT [Serializable] #endif public struct Isset { public bool username; } public sayHello_args() { } public void Read (TProtocol iprot) { iprot.IncrementRecursionDepth(); try { TField field; iprot.ReadStructBegin(); while (true) { field = iprot.ReadFieldBegin(); if (field.Type == TType.Stop) { break; } switch (field.ID) { case 1: if (field.Type == TType.String) { Username = iprot.ReadString(); } else { TProtocolUtil.Skip(iprot, field.Type); } break; default: TProtocolUtil.Skip(iprot, field.Type); break; } iprot.ReadFieldEnd(); } iprot.ReadStructEnd(); } finally { iprot.DecrementRecursionDepth(); } } public void Write(TProtocol oprot) { oprot.IncrementRecursionDepth(); try { TStruct struc = new TStruct("sayHello_args"); oprot.WriteStructBegin(struc); TField field = new TField(); if (Username != null && __isset.username) { field.Name = "username"; field.Type = TType.String; field.ID = 1; oprot.WriteFieldBegin(field); oprot.WriteString(Username); oprot.WriteFieldEnd(); } oprot.WriteFieldStop(); oprot.WriteStructEnd(); } finally { oprot.DecrementRecursionDepth(); } } public override string ToString() { StringBuilder __sb = new StringBuilder("sayHello_args("); bool __first = true; if (Username != null && __isset.username) { if(!__first) { __sb.Append(", "); } __first = false; __sb.Append("Username: "); __sb.Append(Username); } __sb.Append(")"); return __sb.ToString(); } } #if !SILVERLIGHT [Serializable] #endif public partial class sayHello_result : TBase { private string _success; public string Success { get { return _success; } set { __isset.success = true; this._success = value; } } public Isset __isset; #if !SILVERLIGHT [Serializable] #endif public struct Isset { public bool success; } public sayHello_result() { } public void Read (TProtocol iprot) { iprot.IncrementRecursionDepth(); try { TField field; iprot.ReadStructBegin(); while (true) { field = iprot.ReadFieldBegin(); if (field.Type == TType.Stop) { break; } switch (field.ID) { case 0: if (field.Type == TType.String) { Success = iprot.ReadString(); } else { TProtocolUtil.Skip(iprot, field.Type); } break; default: TProtocolUtil.Skip(iprot, field.Type); break; } iprot.ReadFieldEnd(); } iprot.ReadStructEnd(); } finally { iprot.DecrementRecursionDepth(); } } public void Write(TProtocol oprot) { oprot.IncrementRecursionDepth(); try { TStruct struc = new TStruct("sayHello_result"); oprot.WriteStructBegin(struc); TField field = new TField(); if (this.__isset.success) { if (Success != null) { field.Name = "Success"; field.Type = TType.String; field.ID = 0; oprot.WriteFieldBegin(field); oprot.WriteString(Success); oprot.WriteFieldEnd(); } } oprot.WriteFieldStop(); oprot.WriteStructEnd(); } finally { oprot.DecrementRecursionDepth(); } } public override string ToString() { StringBuilder __sb = new StringBuilder("sayHello_result("); bool __first = true; if (Success != null && __isset.success) { if(!__first) { __sb.Append(", "); } __first = false; __sb.Append("Success: "); __sb.Append(Success); } __sb.Append(")"); return __sb.ToString(); } } } }
注:强烈建议大家别去修改Thrift生成的代码
4、新建类实现生成代码的服务接口(实际逻辑调用类)
实现接口HelloWorldService.Iface
public class HelloWorldImp : HelloWorldService.Iface { public string sayHello(string username) { if (string.IsNullOrWhiteSpace(username)) return null; string msg = string.Concat("Hello ", username); Console.WriteLine(msg); return msg; } }
5、发布并注册服务到ZooKeeper
public class ServeTest { public static void Test() { ZKConsumer zooKeeper = ZKInit(); string serviceName = "com.fang.HelloWorld$Iface";//服务名 HelloWorldService.Iface service = new HelloWorldImp();//服务实现逻辑 string serviceIp = "192.168.109.166";//发布服务使用ip int servicePort = 5000;//发布服务使用端口 string group = "kg";//应用程序分组 string serviceVersion = "1.0.0";//服务版本 int serviceTimeOut = 5000; //服务超时阈值(单位Millisecond) int alertElapsed = 3000; //服务执行耗时监控报警阈值(单位Millisecond) int alertFailure = 10; //服务每分钟出错次数监控报警阈值 //注册并发布服务 zooKeeper.RegistService<HelloWorldService.Iface>(serviceName, service, serviceIp, servicePort, group, serviceVersion, serviceTimeOut, alertElapsed, alertFailure); } /// <summary> /// 初始化zooKeeper /// </summary> /// <returns></returns> private static ZKConsumer ZKInit() { ZKConsumer zooKeeper = new ZKConsumer(); zooKeeper.Connectstring = "192.168.109.166:2181"; zooKeeper.Logger = Fang.Log.Loger.CreateDayLog("ServeTest"); zooKeeper.Init(); return zooKeeper; } }
注:其中ZKConsumer就是我定义的和ZooKeeper交互的类,也几乎是.net SOA直接交互的唯一一个类,使用起来是不是非常简单,其实实现还是比较复杂的,随后再说
6、启动服务看一下
6.1 其实执行时候就是开了一个socket监听,很简单
6.2 看一下日志信息
11:46:21 ZooKeeper Init 11:46:21 ZooKeeper Connect 11:46:21 ZK触发了None事件(path:)! 11:46:21 ZooKeeper CONNECTED 11:46:21 Collecter Start 11:46:21 Consumer Subcribe:/dubbo/com.alibaba.dubbo.monitor.gen.thrift.MonitorService%24Iface/providers 11:46:22 Collecter Run 11:46:22 Collecter OnFail 11:46:27 Collecter Run 11:46:27 Collecter OnFail
以上是日志文件,有ZooKeeper连接信息和订阅日志收集服务信息及日志收集信息
日志收集是一个线程调度,由于还没有连接没有日志,所以Collecter都是Fail
6.3 看一下ZooKeeper的变化
ZooKeeper在dubbo节点下多出了一个节点"com.fang.HelloWorld%24Iface"及其多个子节点,其中重点是其providers子节点下有一个很长的节点,那个节点就是表示当前服务信息的,如果服务关闭,这个信息也会消失
都在dubbo下不难理解,因为我们选型就是dubbo,.net要兼容dubbo的一些特性
四、做个客户端来调用HelloWorld服务
1、使用Thrift.exe生成代码并复制到项目中
服务端和客户端生成代码是没有区别的,这个就不再展开,需要再了解参考服务端生成代码部分
2、调用调用HelloWorld服务的源码
public class HelloWorldTest { public static void Test() { ZKConsumer zooKeeper = ZKInit(); Subcribe(zooKeeper);//订阅com.fang.HelloWorld string str = null; do { str = Console.ReadLine(); if (string.Equals(str, "Exit", StringComparison.CurrentCultureIgnoreCase)) return; Console.WriteLine("callDemo"); CallService();//调用服务 } while (true); } /// <summary> /// 订阅AskSearchService /// </summary> /// <param name="zooKeeper"></param> private static void Subcribe(ZKConsumer zooKeeper) { string serviceName = "com.fang.HelloWorld$Iface";//服务名 string serviceGroup = "kg";//服务分组 string serviceVersion = "1.0.0.0";//服务版本 int serviceTimeOut = 5000; //服务超时阈值(单位Millisecond) int alertElapsed = 3000; //服务执行耗时监控报警阈值(单位Millisecond) int alertFailure = 10; //服务每分钟出错次数监控报警阈值 //订阅服务 bool state = zooKeeper.SubcribeService<HelloWorldService.Iface>(serviceName, serviceGroup, serviceVersion, serviceTimeOut, alertElapsed, alertFailure); Console.WriteLine(string.Concat("SubcribeService(", serviceName, ") is ", state.ToString())); } /// <summary> /// 初始化zooKeeper /// </summary> /// <returns></returns> private static ZKConsumer ZKInit() { ZKConsumer zooKeeper = new ZKConsumer(); zooKeeper.Connectstring = "192.168.109.166:2181"; zooKeeper.Logger = Fang.Log.Loger.CreateDayLog("HelloWorldTest"); zooKeeper.Init(); return zooKeeper; } /// <summary> /// 调用服务 /// </summary> private static void CallService() { using (var resource = ZKConsumer.GetServiceByContainer<HelloWorldService.Iface>()) { HelloWorldService.Iface service = resource.Service; if (service == null) Console.WriteLine("service is null"); string results = null; try { results = service.sayHello("Word"); } catch (Exception ex) { Console.WriteLine(ex.Message); } if (results != null) Console.WriteLine(results.ToString()); } } }
注:以上看上去洋洋洒洒几十行,貌似很复杂,其实不然
3、以上代码解析
2.1 初始化和订阅服务
ZKInit是初始化ZooKeeper的,很简单
Subcribe是订阅服务,看上去很复杂,其实就是一行代码,只是为了便于理解拆分写成这样
以上初始化对于每个应用程序都只需要一次
web应用程序(站点)可以在Global.asax的Application_Start中初始化一次,也可以配置一个IHttpModule来初始化(在Init)
控制台和windows服务在Main方法中的开始部分初始化即可
2.2 Test是便于测试写了一个while循环,实际开发可以无视
2.3 CallService是实际调用服务代码
核心就是一个using及其中的GetServiceByContainer方法,及调用sayHello方法,其他都是安全检测异常处理测试代码,算下来核心代码也就是两三行
应该说还是挺简单的吧,当然没有java同学用dubbo简单,但至少这里封装了ZooKeeper、Thrift和日志等。让大家尽量少和业务无关的东西打交道
4、运行测试一下
4.1 执行之后效果如下
4.2 看一下ZooKeeper的变化
这次在consumers下增加了一个很长的节点,证明客户端和服务端都和ZooKeeper连接上了
5、调用几次试试
5.1 客户端截图
5.2 服务端截图
以上测试证明是服务端和客户端通信没有问题
实际上我和Java的同事也联测了,Java调用.net的服务也没有问题,.net调用dubbo(Java)的服务也没有问题
另外,多个服务端和多个客户端也是测试通过了,限于篇幅这个就不再举例
五、主要源码解析
1、项目截图
2、逐个解析一下
2.1 ApplicationInfo很简单就是读取一些应用程序配置信息
AppSettings["ZooKeeperConnectstring"]是ZooKeeper连接地址
AppSettings["ApplicationName"]是应用程序名用来程序定位及服务依赖关系图绘制
AppSettings["ApplicationOwner"]是项目负责人等
2.2 Collecter收集日志的逻辑及其线程调度
2.3 Connecter用来连接ZooKeeper及维护ZooKeeper连接(重连)
2.4 ConsumAop是客户AOP拦截,记录日志到队列
2.5 Consumer用来客户服务订阅及服务路由管理
2.6 HostStat用于服务主机信息解析
2.7 ISubcribe是ZooKeeper订阅接口
2.8 MethodReport是方法执行日志
2.9 Monitor是日志定时报送作业(线程调度)
2.10 Provider用于服务端启动Socket服务及注册到ZooKeeper
2.11 ReportAopHandler是方法执行Aop拦截,用于服务端执行拦截,ConsumAop继承该类
2.12 ReportStorage日志存储器,并维护一个Collecter和一个Monitor线程调度
2.13 ServiceConcurrent用于服务执行并发统计(客户端及服务端)
2.14 ServiceConfig是客户端和服务端的公共配置
2.15 ServiceFactor是客户端服务工厂及Socket连接池调用
2.16 ServiceHost是单个服务主机的Socket工厂及连接池
2.17 ServiceHostManager用于服务主机集群管理
2.18 ServiceResource用于服务资源管理,现在用于回收Socket主机(以后可能要做成服务对象也可以回收)
2.19 ServiceSocket,本来用于维护Socket连接,重连的,现在只是Socket包装类
2.20 Statistics用于日志统计汇总
2.21 ZKConsumer是ZooKeeper消费者,用来维护ZooKeeper连接及基于ZooKeeper的功能
注:另外Aop、容器、资源池。线程调度、类型转化等来源于面向接口主框架
以上功能虽然可以达到.net使用和dubbo兼容的服务功能,但是离dubbo在功能和稳定性上还有差距,这个建设过程需要持续下去
最后,我畅想到一个梦境。一个.net小伙深情的望着Java小姑娘说,我做好准备了,我们做朋友吧。Java小姑娘点点头。此时响起了优美的华尔兹。.net和Java手拉手在舞池里翩翩起舞...