配置文件
etcd
配置文件位于/etc/etcd/etcd.conf
,该配置文件一共有5个section
名称 | 作用 |
---|---|
member | 本节点的配置,包括监听服务端口、心跳时间等 |
cluster | 集群配置,包括集群状态、集群名称以及本节点广播地址 |
proxy | 用于网络自动发现服务 |
security | 安全配置 |
logging | 日志功能组件 |
具体配置采集可以见另一个文章《Etcd集群配置和使用》
初次看到配置文件,都会有一个疑问,为什么在members已经设置了监听服务地址,为什么在cluster还要再次设置一次广播地址呢?
原因:etcd主要的通信协议主要是http协议,对于http协议中所周知它是B/S结构,而非C/S结构,只能一端主动给另一端发消息而反过来则不可。所以对于集群来说,双方必须都要知道对方具体监听地址。
服务监听
我们都知道,建立socket服务端一共有5个基本步骤(C语言):
1、创建socket套接字
2、bind地址及端口
3、listen监听服务4
4、accept接收客户端连接
5、启动新线程为客户端服务。
正所谓万变不离其宗,到了etcd中(etcd使用默认golang http模块)也是这些步骤,只不过是被封装了一下(语法糖)
启动流程见《Etcd源码分析:启动篇》
listener
当进入embed/etcd.go
里面的StartEtcd()
函数的时候
//为peer创建listener,socket三部曲只到了第二个步骤
if e.Peers, err = startPeerListeners(cfg); err != nil {
return e, err
}
//为client创建listener,socket三部曲只到了第二个步骤
if e.sctxs, err = startClientListeners(cfg); err != nil {
return e, err
}
在创建了listener之后,开始创建EtcdServer
//创建EtcdServer并且创建raftNode并运行raftNode
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
return e, err
}
// buffer channel so goroutines on closed connections won't wait forever
e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
······
e.Server.Start()
if err = e.servePeers(); err != nil {
return e, err
}
if err = e.serveClients(); err != nil {
return e, err
}
if err = e.serveMetrics(); err != nil {
return e, err
}
serving = true
Listener有两个分别为:peer listener和client listener,两者大同小异,这里拿peer listener做为分析对象。
func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
······
peers = make([]*peerListener, len(cfg.LPUrls))
······
for i, u := range cfg.LPUrls { //循环遍历多个peer url
if u.Scheme == "http" {
if !cfg.PeerTLSInfo.Empty() {
plog.Warningf("The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files.", u.String())
}
if cfg.PeerTLSInfo.ClientCertAuth {
plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
}
}
// 构造peerListener对象 监听2380 作为服务端模式
peers[i] = &peerListener{close: func(context.Context) error { return nil }}
//调用接口,创建listener对象,返回来之后,socket套接字已经完成listener监听流程
peers[i].Listener, err = rafthttp.NewListener(u, &cfg.PeerTLSInfo)
if err != nil {
return nil, err
}
// once serve, overwrite with 'http.Server.Shutdown'
peers[i].close = func(context.Context) error {
return peers[i].Listener.Close()
}
plog.Info("listening for peers on ", u.String())
}
return peers, nil
}
下面调用关系为
startPeerListeners() [embed/etcd.go]
-> rafthttp.NewListener() [rafthttp/util.go]
-> transport.NewTimeoutListener() [pkg/transport/timeout_listener.go]
-> newListener() [pkg/transport/listener.go]
-> net.Listen() [golang net库函数]
服务监听
服务端socket需要调用Accept方法,我们来看一下serve方法。方法serve大致内容为:将每个服务放到gorouting中,也就是启动一个协程来监听服务。
先看看servePeers()
func (e *Etcd) servePeers() (err error) {
ph := etcdhttp.NewPeerHandler(e.Server)
var peerTLScfg *tls.Config
if !e.cfg.PeerTLSInfo.Empty() {
if peerTLScfg, err = e.cfg.PeerTLSInfo.ServerConfig(); err != nil {
return err
}
}
for _, p := range e.Peers {
gs := v3rpc.Server(e.Server, peerTLScfg)
m := cmux.New(p.Listener)
go gs.Serve(m.Match(cmux.HTTP2()))
srv := &http.Server{
Handler: grpcHandlerFunc(gs, ph),
ReadTimeout: 5 * time.Minute,
ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error
}
go srv.Serve(m.Match(cmux.Any()))
p.serve = func() error { return m.Serve() }
p.close = func(ctx context.Context) error {
// gracefully shutdown http.Server
// close open listeners, idle connections
// until context cancel or time-out
stopServers(ctx, &servers{secure: peerTLScfg != nil, grpc: gs, http: srv})
return nil
}
}
// start peer servers in a goroutine
for _, pl := range e.Peers {
go func(l *peerListener) {
e.errHandler(l.serve())
}(pl)
}
return nil
}
1、生成http.hander 用于处理peer请求;
2、在for循环里面,起一些goroutine,调用Server()
函数来接受Listener传入的连接。
我们来看看NewPeerHandler()
func newPeerHandler(cluster api.Cluster, raftHandler http.Handler, leaseHandler http.Handler) http.Handler {
mh := &peerMembersHandler{
cluster: cluster,
}
//将url和业务层handler注册到servemux中,也就是每一个url请求都会有其对应的handler进行处理
//初始化一个Serve Multiplexer结构
mux := http.NewServeMux()
mux.HandleFunc("/", http.NotFound)
mux.Handle(rafthttp.RaftPrefix, raftHandler)
mux.Handle(rafthttp.RaftPrefix+"/", raftHandler)
mux.Handle(peerMembersPrefix, mh) //处理请求/members handler是mh,即peerMembersHandler
if leaseHandler != nil {
mux.Handle(leasehttp.LeasePrefix, leaseHandler)
mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler)
}
mux.HandleFunc(versionPath, versionHandler(cluster, serveVersion))
return mux
}
应用层业务逻辑需要自己注册url和handler,这样才能保证每个http request都能够被处理。而每个handler都必须要实现对应接口ServeHTTP,例如peerMembersHandler,实现的ServeHTTP接口是用于返回集群成员列表
那么此处只是完成注册,那么在什么地方会调用此处handler?
答案是在ServeHTTP()
里面