wcf系列学习5天速成——第四天 wcf之分布式架构

简介:

今天是wcf系列的第四天,也该出手压轴戏了。嗯,现在的大型架构,都是神马的,

nginx鸡群,iis鸡群,wcf鸡群,DB鸡群,由一个人作战变成了群殴.......

 

今天我就分享下wcf鸡群,高性能架构中一种常用的手法就是在内存中维护一个叫做“索引”的内存数据库,

在实战中利用“索引”这个概念做出"海量数据“的秒杀。

好,先上图:

 

这个图明白人都能看得懂吧。因为我的系列偏重于wcf,所以我重点说下”心跳检测“的实战手法。

 

第一步:上一下项目的结构,才能做到心中有数。

 

第二步:“LoadDBService”这个是控制台程序,目的就是从数据库抽出关系模型加载在内存数据库中,因为这些东西会涉及一些算法的知识,

             在这里就不写算法了,就简单的模拟一下。

LoadDBServcie
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Runtime.Serialization;
using System.Web.Script.Serialization;
using System.IO;
using System.Xml.Serialization;
using System.Xml;
using Common;

namespace LoadDBData
{
    class Program
    {
        static void Main(string[] args)
        {
            //模拟从数据库加载索引到内存中,形成内存中的数据库
//这里的 "Dictionary" 用来表达“一个用户注册过多少店铺“,即UserID与ShopID的一对多关系
            SerializableDictionary<int, List<int>> dic = new SerializableDictionary<int, List<int>>();
            
            List<int> shopIDList = new List<int>();

            for (int shopID = 300000; shopID < 300050; shopID++)
                shopIDList.Add(shopID);
            
            int UserID = 15;

            //假设这里已经维护好了UserID与ShopID的关系
            dic.Add(UserID, shopIDList);
            
            XmlSerializer xml = new XmlSerializer(dic.GetType());
            
            var memoryStream = new MemoryStream();
            
            xml.Serialize(memoryStream, dic);
            
            memoryStream.Seek(0, SeekOrigin.Begin);
            
            //将Dictionary持久化,相当于模拟保存在Mencache里面
            File.AppendAllText("F://1.txt", Encoding.UTF8.GetString(memoryStream.ToArray()));
            
            Console.WriteLine("数据加载成功!");
            
            Console.Read();
        }
    }
}

因为Dictionary不支持序列化,所以我从网上拷贝了一份代码让其执行序列化
SerializableDictionary
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Xml.Serialization;
using System.Xml;
using System.Xml.Schema;
using System.Runtime.Serialization;

namespace Common
{
    ///<summary>
/// 标题:支持 XML 序列化的 Dictionary
///</summary>
///<typeparam name="TKey"></typeparam>
///<typeparam name="TValue"></typeparam>
    [XmlRoot("SerializableDictionary")]
    public class SerializableDictionary<TKey, TValue> : Dictionary<TKey, TValue>, IXmlSerializable
    {

        public SerializableDictionary()
            : base()
        {
        }
        public SerializableDictionary(IDictionary<TKey, TValue> dictionary)
            : base(dictionary)
        {
        }

        public SerializableDictionary(IEqualityComparer<TKey> comparer)
            : base(comparer)
        {
        }

        public SerializableDictionary(int capacity)
            : base(capacity)
        {
        }
        public SerializableDictionary(int capacity, IEqualityComparer<TKey> comparer)
            : base(capacity, comparer)
        {
        }
        protected SerializableDictionary(SerializationInfo info, StreamingContext context)
            : base(info, context)
        {
        }


        public System.Xml.Schema.XmlSchema GetSchema()
        {
            return null;
        }
        ///<summary>
/// 从对象的 XML 表示形式生成该对象
///</summary>
///<param name="reader"></param>
        public void ReadXml(System.Xml.XmlReader reader)
        {
            XmlSerializer keySerializer = new XmlSerializer(typeof(TKey));
            XmlSerializer valueSerializer = new XmlSerializer(typeof(TValue));
            bool wasEmpty = reader.IsEmptyElement;
            reader.Read();
            if (wasEmpty)
                return;
            while (reader.NodeType != System.Xml.XmlNodeType.EndElement)
            {
                reader.ReadStartElement("item");
                reader.ReadStartElement("key");
                TKey key = (TKey)keySerializer.Deserialize(reader);
                reader.ReadEndElement();
                reader.ReadStartElement("value");
                TValue value = (TValue)valueSerializer.Deserialize(reader);
                reader.ReadEndElement();
                this.Add(key, value);
                reader.ReadEndElement();
                reader.MoveToContent();
            }
            reader.ReadEndElement();
        }

        /**/
        ///<summary>
/// 将对象转换为其 XML 表示形式
///</summary>
///<param name="writer"></param>
        public void WriteXml(System.Xml.XmlWriter writer)
        {
            XmlSerializer keySerializer = new XmlSerializer(typeof(TKey));
            XmlSerializer valueSerializer = new XmlSerializer(typeof(TValue));
            foreach (TKey key in this.Keys)
            {
                writer.WriteStartElement("item");
                writer.WriteStartElement("key");
                keySerializer.Serialize(writer, key);
                writer.WriteEndElement();
                writer.WriteStartElement("value");
                TValue value = this[key];
                valueSerializer.Serialize(writer, value);
                writer.WriteEndElement();
                writer.WriteEndElement();
            }
        }

    }
}


第三步: "HeartBeatService"也做成了一个控制台程序,为了图方便,把Contract和Host都放在一个控制台程序中,

            代码中加入了注释,看一下就会懂的。

           

IAddress.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
using System.ServiceModel;
using System.Text;

namespace HeartBeatService
{
    //CallbackContract:这个就是Client实现此接口,方便服务器端通知客户端
    [ServiceContract(CallbackContract = typeof(ILiveAddressCallback))]
    public interface IAddress
    {
        ///<summary>
/// 此方法用于Search启动后,将Search地址插入到此处
///</summary>
///<param name="address"></param>
        [OperationContract(IsOneWay = true)]
        void AddSearch(string address);

        ///<summary>
/// 此方法用于IIS端获取search地址
///</summary>
///<param name="address"></param>
        [OperationContract(IsOneWay = true)]
        void GetService(string address);
    }
}

 
Address.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
using System.ServiceModel;
using System.Text;
using System.Timers;
using System.IO;
using System.Collections.Concurrent;
using SearhService;
using ClientService;

namespace HeartBeatService
{
    //InstanceContextMode:主要是管理上下文的实例,此处是single,也就是单体
//ConcurrencyMode:    主要是用来控制实例中的线程数,此处是Multiple,也就是多线程
    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Multiple)]
    public class Address : IAddress
    {
        static List<string> search = new List<string>();

        static object obj = new object();

        ///<summary>
/// 此静态构造函数用来检测存活的Search个数
///</summary>
        static Address()
        {
            Timer timer = new Timer();
            timer.Interval = 6000;
            timer.Elapsed += (sender, e) =>
            {

                Console.WriteLine("\n***************************************************************************");
                Console.WriteLine("当前存活的Search为:");

                lock (obj)
                {
                    //遍历当前存活的Search
                    foreach (var single in search)
                    {
                        ChannelFactory<IProduct> factory = null;

                        try
                        {
                            //当Search存在的话,心跳服务就要定时检测Search是否死掉,也就是定时的连接Search来检测。
                            factory = new ChannelFactory<IProduct>(new NetTcpBinding(SecurityMode.None), new EndpointAddress(single));
                            factory.CreateChannel().TestSearch();
                            factory.Close();

                            Console.WriteLine(single);

                        }
                        catch (Exception ex)
                        {
                            Console.WriteLine(ex.Message);

                            //如果抛出异常,则说明此search已经挂掉
                            search.Remove(single);
                            factory.Abort();
                            Console.WriteLine("\n当前时间:" + DateTime.Now + " ,存活的Search有:" + search.Count() + "个");
                        }
                    }
                }

                //最后统计下存活的search有多少个
                Console.WriteLine("\n当前时间:" + DateTime.Now + " ,存活的Search有:" + search.Count() + "个");
            };
            timer.Start();
        }

        public void AddSearch(string address)
        {

            lock (obj)
            {
                //是否包含相同的Search地址
                if (!search.Contains(address))
                {
                    search.Add(address);

                    //search添加成功后就要告诉来源处,此search已经被成功载入。
                    var client = OperationContext.Current.GetCallbackChannel<ILiveAddressCallback>();
                    client.LiveAddress(address);
                }
            }
        }

        public void GetService(string address)
        {
            Timer timer = new Timer();
            timer.Interval = 1000;
            timer.Elapsed += (obj, sender) =>
            {
                try
                {
                    //这个是定时的检测IIS是否挂掉
                    var factory = new ChannelFactory<IServiceList>(new NetTcpBinding(SecurityMode.None),
                                                                   new EndpointAddress(address));

                    factory.CreateChannel().AddSearchList(search);

                    factory.Close();

                    timer.Interval = 10000;
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            };
            timer.Start();
        }
    }
}


ILiveAddressCallback.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.ServiceModel;

namespace HeartBeatService
{
    ///<summary>
/// 等客户端实现后,让客户端约束一下,只能是这个LiveAddress方法
///</summary>
    public interface ILiveAddressCallback
    {
        [OperationContract(IsOneWay = true)]
        void LiveAddress(string address);
    }
}

第四步: 我们开一下心跳,预览下效果:

         是的,心跳现在正在检测是否有活着的Search。

 

第五步:"SearhService" 这个Console程序就是WCF的search,主要用于从MemerCache里面读取索引。

          记得要添加一下对“心跳服务”的服务引用。

IProduct.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
using System.ServiceModel;
using System.Text;

namespace SearhService
{
    // 注意: 使用“重构”菜单上的“重命名”命令,可以同时更改代码和配置文件中的接口名“IService1”。
    [ServiceContract]
    public interface IProduct
    {
        [OperationContract]
        List<int> GetShopListByUserID(int userID);

        [OperationContract]
        void TestSearch();
    }
}


Product.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
using System.ServiceModel;
using System.Text;
using Common;
using System.Xml;
using System.IO;
using System.Xml.Serialization;

namespace SearhService
{
    public class Product : IProduct
    {
        public List<int> GetShopListByUserID(int userID)
        {
            //模拟从MemCache中读取索引
            SerializableDictionary<int, List<int>> dic = new SerializableDictionary<int, List<int>>();

            byte[] bytes = Encoding.UTF8.GetBytes(File.ReadAllText("F://1.txt", Encoding.UTF8));

            var memoryStream = new MemoryStream();

            memoryStream.Write(bytes, 0, bytes.Count());

            memoryStream.Seek(0, SeekOrigin.Begin);

            XmlSerializer xml = new XmlSerializer(dic.GetType());

            var obj = xml.Deserialize(memoryStream) as Dictionary<int, List<int>>;

            return obj[userID];
        }

        public void TestSearch() { }
    }
}


SearchHost.cs 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.ServiceModel;
using System.Configuration;
using System.Timers;
using SearhService.HeartBeatService;

namespace SearhService
{
    public class SearchHost : IAddressCallback
    {
        static DateTime startTime;

        public static void Main()
        {
            ServiceHost host = new ServiceHost(typeof(Product));

            host.Open();

            AddSearch();

            Console.Read();

        }

        static void AddSearch()
        {
            startTime = DateTime.Now;

            Console.WriteLine("Search服务发送中.....\n\n*************************************************\n");

            try
            {
                var heartClient = new AddressClient(new InstanceContext(new SearchHost()));

                string search = ConfigurationManager.AppSettings["search"];

                heartClient.AddSearch(search);
            }
            catch (Exception ex)
            {
                Console.WriteLine("Search服务发送失败:" + ex.Message);
            }
        }

        public void LiveAddress(string address)
        {
            Console.WriteLine("恭喜你," + address + "已被心跳成功接收!\n");
            Console.WriteLine("发送时间:" + startTime + "\n接收时间:" + DateTime.Now);
        }
    }
}

第六步:此时Search服务已经建好,我们可以测试当Search开启获取关闭对心跳有什么影响:

              Search开启时:

                       

          

           Search关闭时:

              

           对的,当Search关闭时,心跳检测该Search已经死掉,然后只能从集群中剔除。

           当然,我们可以将Search拷贝N份,部署在N台机器中,只要修改一下endpoint地址就OK了,这一点明白人都会。

 

第七步:"ClientService" 这里也就指的是IIS,此时我们也要添加一下对心跳的服务引用。

IServiceList.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.ServiceModel;

namespace ClientService
{
    [ServiceContract]
    public interface IServiceList
    {
        [OperationContract]
        void AddSearchList(List<string> search);
    }
}


ServiceList.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.ServiceModel;
using System.Configuration;
using System.Timers;
using System.Threading;

namespace ClientService
{
    public class ServiceList : IServiceList
    {
        public static List<string> searchList = new List<string>();

        static object obj = new object();

        public static string Search
        {
            get
            {
                lock (obj)
                {
                    //如果心跳没及时返回地址,客户端就在等候
                    if (searchList.Count == 0)
                        Thread.Sleep(1000);
                    return searchList[new Random().Next(0, searchList.Count)];
                }
            }
            set
            {

            }
        }

        public void AddSearchList(List<string> search)
        {
            lock (obj)
            {
                searchList = search;

                Console.WriteLine("************************************");
                Console.WriteLine("当前存活的Search为:");

                foreach (var single in searchList)
                {
                    Console.WriteLine(single);
                }
            }
        }
    }
}


Program.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.ServiceModel;
using System.Configuration;
using System.Threading;
using ClientService.HeartBeatService;
using SearhService;
using BaseClass;
using System.Data;
using System.Diagnostics;

namespace ClientService
{
    class Program : IAddressCallback
    {
        static void Main(string[] args)
        {

            ServiceHost host = new ServiceHost(typeof(ServiceList));

            host.Open();

            var client = new AddressClient(new InstanceContext(new Program()));

            //配置文件中获取iis的地址
            var iis = ConfigurationManager.AppSettings["iis"];

            //将iis的地址告诉心跳
            client.GetService(iis);

            //从集群中获取search地址来对Search服务进行调用
            var factory = new ChannelFactory<IProduct>(new NetTcpBinding(SecurityMode.None), new EndpointAddress(ServiceList.Search));

            //根据userid获取了shopID的集合
            var shopIDList = factory.CreateChannel().GetShopListByUserID(15);

            //.......................... 后续就是我们将shopIDList做连接数据库查询(做到秒杀)

            Console.Read();
        }

        public void LiveAddress(string address)
        {

        }
    }
}

 

然后我们开启Client,看看效果咋样:


当然,search集群后,client得到search的地址是随机的,也就分担了search的负担,实现有福同享,有难同当的效果了。

 

最后: 我们做下性能检测,看下“秒杀”和“毫秒杀”的效果。

          首先在数据库的User表和Shop插入了180万和20万的数据用于关联。

          ClientService改造后的代码:

Program.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.ServiceModel;
using System.Timers;
using System.Diagnostics;
using BaseClass;
using ClientService;
using ClientService.HeartBeatService;
using System.Configuration;
using SearhService;

namespace ClientService
{
    class Program : IAddressCallback
    {
        static void Main(string[] args)
        {

            ServiceHost host = new ServiceHost(typeof(ServiceList));

            host.Open();

            var client = new AddressClient(new InstanceContext(new Program()));

            //配置文件中获取iis的地址
            var iis = ConfigurationManager.AppSettings["iis"];

            //将iis的地址告诉心跳
            client.GetService(iis);

            //从集群中获取search地址来对Search服务进行调用
            var factory = new ChannelFactory<IProduct>(new NetTcpBinding(SecurityMode.None), new EndpointAddress(ServiceList.Search));

            //根据userid获取了shopID的集合
//比如说这里的ShopIDList是通过索引交并集获取的分页的一些shopID
            var shopIDList = factory.CreateChannel().GetShopListByUserID(15);

            var strSql = string.Join(",", shopIDList);

            Stopwatch watch = new Stopwatch();

            watch.Start();
            SqlHelper.Query("select s.ShopID,u.UserName,s.ShopName  from [User] as u ,Shop as s where s.ShopID in(" + strSql + ")");
            watch.Stop();

            Console.WriteLine("通过wcf索引获取的ID >>>花费时间:" + watch.ElapsedMilliseconds);

            //普通的sql查询花费的时间
            StringBuilder builder = new StringBuilder();

            builder.Append("select * from ");
            builder.Append("(select  ROW_NUMBER() over(order by s.ShopID) as NumberID, ");
            builder.Append(" s.ShopID, u.UserName, s.ShopName ");
            builder.Append("from Shop s left join [User] as u on u.UserID=s.UserID ");
            builder.Append("where  s.UserID=15) as array ");
            builder.Append("where NumberID>300000 and NumberID<300050");

            watch.Start();
            SqlHelper.Query(builder.ToString());
            watch.Stop();

            Console.WriteLine("普通的sql分页 >>>花费时间:" + watch.ElapsedMilliseconds);

            Console.Read();
        }

        public void LiveAddress(string address)
        {

        }
    }
}

性能图:

对的,一个秒杀,一个是毫秒杀,所以越复杂越能展示出“内存索引”的强大之处。

相关文章
|
19天前
|
存储 缓存 NoSQL
分布式系统架构8:分布式缓存
本文介绍了分布式缓存的理论知识及Redis集群的应用,探讨了AP与CP的区别,Redis作为AP系统具备高性能和高可用性但不保证强一致性。文章还讲解了透明多级缓存(TMC)的概念及其优缺点,并详细分析了memcached和Redis的分布式实现方案。此外,针对缓存穿透、击穿、雪崩和污染等常见问题提供了应对策略,强调了Cache Aside模式在解决数据一致性方面的作用。最后指出,面试中关于缓存的问题多围绕Redis展开,建议深入学习相关知识点。
120 8
|
25天前
|
负载均衡 算法
架构学习:7种负载均衡算法策略
四层负载均衡包括数据链路层、网络层和应用层负载均衡。数据链路层通过修改MAC地址转发帧;网络层通过改变IP地址实现数据包转发;应用层有多种策略,如轮循、权重轮循、随机、权重随机、一致性哈希、响应速度和最少连接数均衡,确保请求合理分配到服务器,提升性能与稳定性。
171 11
架构学习:7种负载均衡算法策略
|
1月前
|
存储 Prometheus Cloud Native
分布式系统架构6:链路追踪
本文深入探讨了分布式系统中的链路追踪理论,涵盖追踪与跨度的概念、追踪系统的模块划分及数据收集的三种方式。链路追踪旨在解决复杂分布式系统中请求流转路径不清晰的问题,帮助快速定位故障和性能瓶颈。文中介绍了基于日志、服务探针和边车代理的数据收集方法,并简述了OpenTracing、OpenCensus和OpenTelemetry等链路追踪协议的发展历程及其特点。通过理解这些概念,可以更好地掌握开源链路追踪框架的使用。
94 41
|
13天前
|
消息中间件 算法 调度
分布式系统学习10:分布式事务
本文是小卷关于分布式系统架构学习系列的第13篇,重点探讨了分布式事务的相关知识。随着业务增长,单体架构拆分为微服务后,传统的本地事务无法满足需求,因此需要引入分布式事务来保证数据一致性。文中详细介绍了分布式事务的必要性、实现方案及其优缺点,包括刚性事务(如2PC、3PC)和柔性事务(如TCC、Saga、本地消息表、MQ事务、最大努力通知)。同时,还介绍了Seata框架作为开源的分布式事务解决方案,提供了多种事务模式,简化了分布式事务的实现。
38 5
|
17天前
|
NoSQL 关系型数据库 MySQL
分布式系统学习9:分布式锁
本文介绍了分布式系统中分布式锁的概念、实现方式及其应用场景。分布式锁用于在多个独立的JVM进程间确保资源的互斥访问,具备互斥、高可用、可重入和超时机制等特点。文章详细讲解了三种常见的分布式锁实现方式:基于Redis、Zookeeper和关系型数据库(如MySQL)。其中,Redis适合高性能场景,推荐使用Redisson库;Zookeeper适用于对一致性要求较高的场景,建议基于Curator框架实现;而基于数据库的方式性能较低,实际开发中较少使用。此外,还探讨了乐观锁和悲观锁的区别及适用场景,并介绍了如何通过Lua脚本和Redis的`SET`命令实现原子操作,以及Redisson的自动续期机
66 7
|
22天前
|
存储 缓存 安全
分布式系统架构7:本地缓存
这是小卷关于分布式系统架构学习的第10篇文章,主要介绍本地缓存的基础理论。文章分析了引入缓存的利弊,解释了缓存对CPU和I/O压力的缓解作用,并讨论了缓存的吞吐量、命中率、淘汰策略等属性。同时,对比了几种常见的本地缓存工具(如ConcurrentHashMap、Ehcache、Guava Cache和Caffeine),详细介绍了它们的访问控制、淘汰策略及扩展功能。
47 6
|
2月前
|
人工智能 自然语言处理
RWKV-7:RWKV系列开源最新的大模型架构,具有强大的上下文学习能力,超越传统的Attention范式
RWKV-7是RWKV系列的最新大模型架构版本,具有强大的上下文学习能力,超越了传统的attention和linear attention范式。本文详细介绍了RWKV-7的主要功能、技术原理及其在多语言处理、文本生成等领域的应用场景。
163 7
RWKV-7:RWKV系列开源最新的大模型架构,具有强大的上下文学习能力,超越传统的Attention范式
|
25天前
|
存储 关系型数据库 分布式数据库
[PolarDB实操课] 01.PolarDB分布式版架构介绍
《PolarDB实操课》之“PolarDB分布式版架构介绍”由阿里云架构师王江颖主讲。课程涵盖PolarDB-X的分布式架构、典型业务场景(如实时交易、海量数据存储等)、分布式焦点问题(如业务连续性、一致性保障等)及技术架构详解。PolarDB-X基于Share-Nothing架构,支持HTAP能力,具备高可用性和容错性,适用于多种分布式改造和迁移场景。课程链接:[https://developer.aliyun.com/live/253957](https://developer.aliyun.com/live/253957)。更多内容可访问阿里云培训中心。
[PolarDB实操课] 01.PolarDB分布式版架构介绍
|
2月前
|
设计模式 存储 算法
分布式系统架构5:限流设计模式
本文是小卷关于分布式系统架构学习的第5篇,重点介绍限流器及4种常见的限流设计模式:流量计数器、滑动窗口、漏桶和令牌桶。限流旨在保护系统免受超额流量冲击,确保资源合理分配。流量计数器简单但存在边界问题;滑动窗口更精细地控制流量;漏桶平滑流量但配置复杂;令牌桶允许突发流量。此外,还简要介绍了分布式限流的概念及实现方式,强调了限流的代价与收益权衡。
89 11
|
2月前
|
设计模式 监控 Java
分布式系统架构4:容错设计模式
这是小卷对分布式系统架构学习的第4篇文章,重点介绍了三种常见的容错设计模式:断路器模式、舱壁隔离模式和重试模式。断路器模式防止服务故障蔓延,舱壁隔离模式通过资源隔离避免全局影响,重试模式提升短期故障下的调用成功率。文章还对比了这些模式的优缺点及适用场景,并解释了服务熔断与服务降级的区别。尽管技术文章阅读量不高,但小卷坚持每日更新以促进个人成长。
59 11