OpenIM服务发现和负载均衡golang插件:gRPC接入etcdv3

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
EMR Serverless StarRocks,5000CU*H 48000GB*H
应用型负载均衡 ALB,每月750个小时 15LCU
简介: OpenIM集成此插件,实现了轻量级的服务发现机制,打造了基于集群的IM服务,各模块很方便平行扩展,方便运维,本文把思路和代码分享给大家。

etcd简介

etcd是CoreOS团队于2013年6月发起的开源项目,它的目标是构建一个高可用的分布式键值(key-value)数据库。etcd内部采用raft协议作为一致性算法,etcd基于Go语言实现。

etcd作为服务发现系统,有以下的特点:

  • 简单:安装配置简单,而且提供了HTTP API进行交互,使用也很简单
  • 安全:支持SSL证书验证
  • 快速:根据官方提供的benchmark数据,单实例支持每秒2k+读操作
  • 可靠:采用raft算法,实现分布式系统数据的可用性和一致性

etcd项目地址:https://github.com/coreos/etcd/

etcd典型应用场景-服务发现

etcd比较多的应用场景是用于服务发现,服务发现(Service Discovery)要解决的是分布式系统中最常见的问题之一,即在同一个分布式集群中的进程或服务如何才能找到对方并建立连接。

从本质上说,服务发现就是要了解集群中是否有进程在监听upd或者tcp端口,并且通过名字就可以进行查找和链接。

要解决服务发现的问题,需要下面三大支柱,缺一不可。

  • 一个强一致性、高可用的服务存储目录。

基于Ralf算法的etcd天生就是这样一个强一致性、高可用的服务存储目录。

  • 一种注册服务和健康服务健康状况的机制。

用户可以在etcd中注册服务,并且对注册的服务配置key TTL,定时保持服务的心跳以达到监控健康状态的效果。

  • 一种查找和连接服务的机制。

通过在etcd指定的主题下注册的服务业能在对应的主题下查找到。为了确保连接,我们可以在每个服务机器上都部署一个proxy模式的etcd,这样就可以确保访问etcd集群的服务都能够互相连接。

1.png

gRPC简介

gRPC是谷歌开源的一款跨平台、高性能的RPC框架。gRPC是一个现代的开源高性能RPC框架,可以在任何环境下运行。在实际开发过程中,主要使用它来进行后端微服务的开发。

在gRPC中,客户端应用程序可以像本地对象那样直接调用另一台计算机上的服务器应用程序上的方法,从而更容易创建分布式应用程序和服务。与许多RPC系统一样,gRPC基于定义服务的思想,可以通过设置参数和返回类型来远程调用方法。在服务端,实现这个接口并运行gRPC服务器来处理客户端调用。客户端提供的方法(客户端与服务端的方法相同)。

如图所示,gRPC客户端和服务端可以在各种环境中运行和相互通信,并且可以用gRPC支持的任何语言编写。因此,可以用Go语言创建一个gRPC服务器,同时供PHP客户端和Android客户端等多个客户端调用,从而突破开发语言的限制。

2.jpg

服务注册:register.go

``

/etcdAddr separated by commas

func RegisterEtcd(schema, etcdAddr, myHost string, myPort int, serviceName string, ttl int) error {

  cli, err := clientv3.New(clientv3.Config{

     Endpoints: strings.Split(etcdAddr, ","),

  })

  fmt.Println("RegisterEtcd")

  if err != nil {

     //    return fmt.Errorf("grpclb: create clientv3 client failed: %v", err)

     return fmt.Errorf("create etcd clientv3 client failed, errmsg:%v, etcd addr:%s", err, etcdAddr)

  }

  //lease

  ctx, cancel := context.WithCancel(context.Background())

  resp, err := cli.Grant(ctx, int64(ttl))

  if err != nil {

     return fmt.Errorf("grant failed")

  }

  //  schema:///serviceName/ip:port ->ip:port

  serviceValue := net.JoinHostPort(myHost, strconv.Itoa(myPort))

  serviceKey := GetPrefix(schema, serviceName) + serviceValue

  //set key->value

  if _, err := cli.Put(ctx, serviceKey, serviceValue, clientv3.WithLease(resp.ID)); err != nil {

     return fmt.Errorf("put failed, errmsg:%v, key:%s, value:%s", err, serviceKey, serviceValue)

  }

  //keepalive

  kresp, err := cli.KeepAlive(ctx, resp.ID)

  if err != nil {

     return fmt.Errorf("keepalive faild, errmsg:%v, lease id:%d", err, resp.ID)

  }

  go func() {

  FLOOP:

     for {

        select {

        case _, ok := <-kresp:

           if ok == true {

           } else {

              break FLOOP

           }

        }

     }

  }()

  rEtcd = &RegEtcd{ctx: ctx,

     cli:    cli,

     cancel: cancel,

     key:    serviceKey}

  return nil

}

grpc模块在启动时调用RegisterEtcd注册,并定时lease

命名解析实现及服务发现:resolver.go

``

type Resolver struct {

  cc                 resolver.ClientConn

  serviceName        string

  grpcClientConn     *grpc.ClientConn

  cli                *clientv3.Client

  schema             string

  etcdAddr           string

  watchStartRevision int64

}

var (

  nameResolver        = make(map[string]*Resolver)

  rwNameResolverMutex sync.RWMutex

)

func NewResolver(schema, etcdAddr, serviceName string) (*Resolver, error) {

  etcdCli, err := clientv3.New(clientv3.Config{

     Endpoints: strings.Split(etcdAddr, ","),

  })

  if err != nil {

     return nil, err

  }

  var r Resolver

  r.serviceName = serviceName

  r.cli = etcdCli

  r.schema = schema

  r.etcdAddr = etcdAddr

  resolver.Register(&r)

  conn, err := grpc.Dial(

     GetPrefix(schema, serviceName),

     grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)),

     grpc.WithInsecure(),

     grpc.WithTimeout(time.Duration(5)*time.Second),

  )

  if err == nil {

     r.grpcClientConn = conn

  }

  return &r, err

}

func (r1 *Resolver) ResolveNow(rn resolver.ResolveNowOptions) {

}

func (r1 *Resolver) Close() {

}

func GetConn(schema, etcdaddr, serviceName string) *grpc.ClientConn {

  rwNameResolverMutex.RLock()

  r, ok := nameResolver[schema+serviceName]

  rwNameResolverMutex.RUnlock()

  if ok {

     return r.grpcClientConn

  }

  rwNameResolverMutex.Lock()

  r, ok = nameResolver[schema+serviceName]

  if ok {

     rwNameResolverMutex.Unlock()

     return r.grpcClientConn

  }

  r, err := NewResolver(schema, etcdaddr, serviceName)

  if err != nil {

     rwNameResolverMutex.Unlock()

     return nil

  }

  nameResolver[schema+serviceName] = r

  rwNameResolverMutex.Unlock()

  return r.grpcClientConn

}

func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {

  if r.cli == nil {

     return nil, fmt.Errorf("etcd clientv3 client failed, etcd:%s", target)

  }

  r.cc = cc

  ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)

  //     "%s:///%s"

  prefix := GetPrefix(r.schema, r.serviceName)

  // get key first

  resp, err := r.cli.Get(ctx, prefix, clientv3.WithPrefix())

  if err == nil {

     var addrList []resolver.Address

     for i := range resp.Kvs {

        fmt.Println("init addr: ", string(resp.Kvs[i].Value))

        addrList = append(addrList, resolver.Address{Addr: string(resp.Kvs[i].Value)})

     }

     r.cc.UpdateState(resolver.State{Addresses: addrList})

     r.watchStartRevision = resp.Header.Revision + 1

     go r.watch(prefix, addrList)

  } else {

     return nil, fmt.Errorf("etcd get failed, prefix: %s", prefix)

  }

  return r, nil

}

func (r *Resolver) Scheme() string {

  return r.schema

}

func exists(addrList []resolver.Address, addr string) bool {

  for _, v := range addrList {

     if v.Addr == addr {

        return true

     }

  }

  return false

}

func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {

  for i := range s {

     if s[i].Addr == addr {

        s[i] = s[len(s)-1]

        return s[:len(s)-1], true

     }

  }

  return nil, false

}

func (r *Resolver) watch(prefix string, addrList []resolver.Address) {

  rch := r.cli.Watch(context.Background(), prefix, clientv3.WithPrefix(), clientv3.WithPrefix())

  for n := range rch {

     flag := 0

     for _, ev := range n.Events {

        switch ev.Type {

        case mvccpb.PUT:

           if !exists(addrList, string(ev.Kv.Value)) {

              flag = 1

              addrList = append(addrList, resolver.Address{Addr: string(ev.Kv.Value)})

              fmt.Println("after add, new list: ", addrList)

           }

        case mvccpb.DELETE:

           fmt.Println("remove addr key: ", string(ev.Kv.Key), "value:", string(ev.Kv.Value))

           i := strings.LastIndexAny(string(ev.Kv.Key), "/")

           if i < 0 {

              return

           }

           t := string(ev.Kv.Key)[i+1:]

           fmt.Println("remove addr key: ", string(ev.Kv.Key), "value:", string(ev.Kv.Value), "addr:", t)

           if s, ok := remove(addrList, t); ok {

              flag = 1

              addrList = s

              fmt.Println("after remove, new list: ", addrList)

           }

        }

     }

     if flag == 1 {

        r.cc.UpdateState(resolver.State{Addresses: addrList})

        fmt.Println("update: ", addrList)

     }

  }

}

客户端先通过GetConn获取conn,然后再调用grpc服务,调用后不用关闭conn

服务端示例代码:server.go

``

getcdv3.RegisterEtcd ("sk", etcdAddr, "127.0.0.1", port, "myrpc1", 10)

s := grpc.NewServer()

helloworld.RegisterHelloServer(s, &server{})

s.Serve(listener)

客户端示例代码:client.go

``

p := getcdv3.GetConn("sk", etcdAddr, "myrpc1")

client := helloworld.NewHelloClient(p)

resp1, err := client.SayHello(context.Background(), &helloworld.HelloReq{Req: "world"})

总结:OpenIM集成此插件,实现了轻量级的服务发现机制,打造了基于集群的IM服务,各模块很方便平行扩展,方便运维。在使用grpc、etcd过程中,特别注意版本兼容问题,具体可以参考OpenIM的go.mod文件


更多阅读

基于Tablestore Timeline的IM(即时通讯)消息系统架构 - 架构篇

OpenIM官网

【原创】开源OpenIM:高性能、可伸缩、易扩展的即时通讯架构

相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
目录
相关文章
|
Go 数据库
Golang 语言编写 gRPC 实战项目
Golang 语言编写 gRPC 实战项目
134 0
|
存储 JSON Go
Golang 语言 gRPC 服务怎么同时支持 gRPC 和 HTTP 客户端调用?
Golang 语言 gRPC 服务怎么同时支持 gRPC 和 HTTP 客户端调用?
194 0
|
2月前
|
Go
Golang语言之gRPC程序设计示例
这篇文章是关于Golang语言使用gRPC进行程序设计的详细教程,涵盖了RPC协议的介绍、gRPC环境的搭建、Protocol Buffers的使用、gRPC服务的编写和通信示例。
93 3
Golang语言之gRPC程序设计示例
|
22天前
|
负载均衡 网络协议 调度
Docker Swarm服务发现与负载均衡
【10月更文挑战第8天】
23 1
|
3月前
|
负载均衡 算法 微服务
基于gRPC的注册发现与负载均衡的原理和实战
基于gRPC的注册发现与负载均衡的原理和实战
|
4月前
|
运维 监控 测试技术
Golang质量生态建设问题之接入并使用Go单元测试插件的问题如何解决
Golang质量生态建设问题之接入并使用Go单元测试插件的问题如何解决
|
4月前
|
自然语言处理 Go 数据处理
云计算自旋锁问题之引入Golang插件系统后iLogtail的输入输出通道和处理能力如何解决
云计算自旋锁问题之引入Golang插件系统后iLogtail的输入输出通道和处理能力如何解决
36 1
|
4月前
|
负载均衡 Kubernetes 算法
K8s服务发现与负载均衡的技术探索
【7月更文挑战第15天】K8s通过Service资源对象和kube-proxy组件实现了高效、灵活的服务发现和负载均衡机制。通过合理选择Service类型、优化kube-proxy配置以及使用Ingress进行高级路由,可以确保应用在K8s集群中高效、可靠地运行。随着云原生技术的不断发展,K8s的服务发现和负载均衡机制也将不断完善和优化,为更多场景提供强大的支持。
|
4月前
|
负载均衡 监控 Kubernetes
Service Mesh 是一种用于处理服务间通信的基础设施层,它通常与微服务架构一起使用,以提供诸如服务发现、负载均衡、熔断、监控、追踪和安全性等功能。
Service Mesh 是一种用于处理服务间通信的基础设施层,它通常与微服务架构一起使用,以提供诸如服务发现、负载均衡、熔断、监控、追踪和安全性等功能。
|
4月前
|
消息中间件 存储 负载均衡
消息队列 MQ使用问题之如何在grpc客户端中设置负载均衡器
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。