3.2.4 Manager启动
1. 启动 Cache
启动 Cache,见代码清单 3-17。
func(c*multiNamespaceCache)Start(ctxcontext.Context)error{forns,cache:=rangec.namespaceToCache{
gofunc(nsstring,cacheCache){
//namespaceToCache存储每个ns的 cache,默认是Informers
//Map类型
//cache⼊⼝
err:=cache.Start(ctx)iferr!=nil{
log.Error(err,"multinamespacecachefailedtostartnamespacedinformer","namespace",ns)
}
}(ns,cache)
}
<-ctx.Done()returnnil
}
InformersMap抽象出 3个 Map结构:structured/unstructured/metadata,分别存储不同的 Informer,见代码清单 3-18。
func(m*InformersMap)Start(ctxcontext.Context)error{gom.structured.Start(ctx)
gom.unstructured.Start(ctx)gom.metadata.Start(ctx)
<-ctx.Done()returnnil
}
启动每个 Informer,见代码清单3-19。
func(ip*specificInformersMap)Start(ctxcontext.Context){
func(){
...
for _,informer:=rangeip.informersByGVK{goinformer.Informer.Run(ctx.Done())
}
...
}()
<-ctx.Done()
}
Cache的核心逻辑是初始化内部所有的 Informer,初始化Informer后就创建了Reflector和内部 Controller,Reflector和 Controller 两个组件是一个“生产者—消费者”模型,Reflector负责监听 APIServer上指定的 GVK 资源的变化,然后将变更写入 delta队列中,Controller负责消费这些变更的事件,然后更新本地 Indexer,最后计算出是创建、更新,还是删除事件,推给我们之前注册的 WatchHandler。
2. 启动 Controller
用户自定义的 Controller需要实现 Start方法,程序启动后 controllerManager会自动调用 Start 方法启动 Controller。每个 Controller 会启动一个 Goroutinue,见代码清单3-20。
func(cm*controllerManager)startRunnable(rRunnable){
cm.waitForRunnable.Add(1)
gofunc(){
defercm.waitForRunnable.Done()
iferr:=r.Start(cm.internalCtx);err!=nil{cm.errChan<-err
}
}()
}