聚焦目标
了解Informer在发现资源变化后,是怎么处理的
目录
Process
func (c *controller) processLoop() {
for {
// Pop出Object元素
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// 重新进队列
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
// 去查看Pop的具体实现
func (f *FIFO) Pop(process PopProcessFunc) (interface{
}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
// 调用process去处理item,然后返回
item, ok := f.items[id]
delete(f.items, id)
err := process(item)
return item, err
}
}
// 然后去查一下 PopProcessFunc 的定义,在创建controller前
cfg := &Config{
Process: s.HandleDeltas,
}
func (s *sharedIndexInformer) HandleDeltas(obj interface{
}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
for _, d := range obj.(Deltas) {
switch d.Type {
// 增、改、替换、同步
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
// 先去indexer查询
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
// 如果数据已经存在,就执行Update逻辑
if err := s.indexer.Update(d.Object); err != nil {
return err
}
isSync := false
switch {
case d.Type == Sync:
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
// 分发Update事件
s.processor.distribute(updateNotification{
oldObj: old, newObj: d.Object}, isSync)
} else {
// 没查到数据,就执行Add操作
if err := s.indexer.Add(d.Object); err != nil {
return err
}
// 分发 Add 事件
s.processor.distribute(addNotification{
newObj: d.Object}, false)
}
// 删除
case Deleted:
// 去indexer删除
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
// 分发 delete 事件
s.processor.distribute(deleteNotification{
oldObj: d.Object}, false)
}
}
return nil
}
Index
Index
的定义为资源的本地存储,保持与etcd中的资源信息一致。
// 我们去看看Index是怎么创建的
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{
}
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{
clock: realClock},
// indexer 的初始化
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
clock: realClock,
}
return sharedIndexInformer
}
// 生成一个map和func组合而成的Indexer
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{
}),
keyFunc: keyFunc,
}
// ThreadSafeStore的底层是一个并发安全的map,具体实现我们暂不考虑
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
return &threadSafeMap{
items: map[string]interface{
}{
},
indexers: indexers,
indices: indices,
}
}
distribute
// 在上面的Process代码中,我们看到了将数据存储到Indexer后,调用了一个分发的函数
s.processor.distribute()
// 分发process的创建
func NewSharedIndexInformer() SharedIndexInformer {
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{
clock: realClock},
}
return sharedIndexInformer
}
// sharedProcessor的结构
type sharedProcessor struct {
listenersStarted bool
// 读写锁
listenersLock sync.RWMutex
// 普通监听列表
listeners []*processorListener
// 同步监听列表
syncingListeners []*processorListener
clock clock.Clock
wg wait.Group
}
// 查看distribute函数
func (p *sharedProcessor) distribute(obj interface{
}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
// 将object分发到 同步监听 或者 普通监听 的列表
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
// 这个add的操作是利用了channel
func (p *processorListener) add(notification interface{
}) {
p.addCh <- notification
}
Summary
Informer
依赖于Reflector
模块,它有个组件为 xxxInformer,如podInformer
- 具体资源的
Informer
包含了一个连接到kube-apiserver
的client
,通过List
和Watch
接口查询资源变更情况 - 检测到资源发生变化后,通过
Controller
将数据放入队列DeltaFIFOQueue
里,生产阶段完成 - 在
DeltaFIFOQueue
的另一端,有消费者在不停地处理资源变化的事件,处理逻辑主要分2步- 将数据保存到本地存储Indexer,它的底层实现是一个并发安全的threadSafeMap
- 有些组件需要实时关注资源变化,会实时监听listen,就将事件分发到对应注册上来的listener上,自行处理