带你读《云原生应用开发 Operator原理与实践》第二章 Operator 原理2.2Client-go 原理(十三)

简介: 带你读《云原生应用开发 Operator原理与实践》第二章 Operator 原理2.2Client-go 原理(十二)

1. Client-goIndexer

 

     资源对象从 DeltaFIFOPop 出去后又经过了哪些处理呢。这要从一开始的 sharedIndexInformer说起。注意,在 sharedIndexInformerRun 方法中,初始化了它的配置,并执行了 s.controller.Run方法。我们可以看到s.controller.Run中初始化了 Reflector,开始了指定资源的List-Watch 操作,并且同步到了DeltaFIFO中,同时执行了processLoop方法。此时我们可以看到 processLoop方法不断从DeltaFIFO中将资源对象 Pop来, 并且交给了之前的 c.config.Process方法进行处理。而c.config.Process方法就是sharedIndexInformerHandleDeltas方法,具体见代码清单 2-45



func(s*sharedIndexInformer)Run(stopCh<-chanstruct{}){

...

cfg:=&Config{

Queue:              fifo,

ListerWatcher:             s.listerWatcher,ObjectType:          s.objectType,FullResyncPeriod:s.resyncCheckPeriod,RetryOnError:                 false,

ShouldResync:       s.processor.shouldResync,

 

Process:         s.HandleDeltas,WatchErrorHandler:s.watchErrorHandler,

}

 

func(){

s.startedLock.Lock()

defers.startedLock.Unlock()

 

...

}()


s.controller=New(cfg)s.controller.(*controller).clock=s.clocks.started=true

s.controller.Run(stopCh)

}

 

func(c*controller)Run(stopCh<-chanstruct{}){deferutilruntime.HandleCrash()

gofunc(){

<-stopCh

c.config.Queue.Close()

}()

r:=NewReflector(


c.config.ListerWatcher,c.config.ObjectType,c.config.Queue,c.config.FullResyncPeriod,

)

r.ShouldResync=c.config.ShouldResync

r.clock=c.clock

...

}

c.reflectorMutex.Lock()c.reflector=rc.reflectorMutex.Unlock()

 

wait.Until(c.processLoop,time.Second,stopCh)

func(c*controller)processLoop(){for{

obj,err:=c.config.Queue.Pop(PopProcessFunc(c.config.Process))

iferr!=nil{

iferr==FIFOClosedError{

return

}

ifc.config.RetryOnError{

//Thisisthesafewaytore-enqueue.

c.config.Queue.AddIfNotPresent(obj)

}

}

}

}

 

 

综上可知,由 DeltaFIFOPop出来的对象最后交给了 HandleDeltas进行处理,而HandleDeltas中,将资源对象同步到了 Indexer中,至此我们引出了 Informer模块中的3个组件 Indexer。IndexerClient-go 中实现的一个本地存储,它可以建立索引并存Resource的对象。Reflector通过 DeltaFIFOQueue将资源对象存储到Indexer中。需要注意的是,Indexer中的数据与 ETCD中的数据是完全一致的,当 Client-go需要数据时,无须每次都从 APIServer中获取,从而减轻了请求过多造成的对 APIServer的压力, 具体见代码清单 2-46。

func(s*sharedIndexInformer)HandleDeltas(objinterface{})error{

s.blockDeltas.Lock()

defers.blockDeltas.Unlock()

 

//fromoldesttonewest

for_,d:=rangeobj.(Deltas){switchd.Type{

caseSync,Replaced,Added,Updated:s.cacheMutationDetector.AddObject(d.Object)

ifold,exists,err:=s.indexer.Get(d.Object);err==nil&&

exists{

if err:=s.indexer.Update(d.Object);err!=nil{returnerr


}

 

requestedresync 

nil{

 

==nil{


isSync:=falseswitch{

cased.Type==Sync:

//Synceventsareonlypropagatedtolistenersthat

 

isSync=true

cased.Type==Replaced:

ifaccessor,err:=meta.Accessor(d.Object);err==ifoldAccessor,err:=meta.Accessor(old);err

//Replacedeventsthatdidn'tchange


resourceVersionaretreatedasresyncevents

//andonlypropagatedtolisteners


thatrequestedresync

 

==oldAccessor.GetResourceVersion()

}


isSync=accessor.GetResourceVersion()

}

}

s.processor.distribute(updateNotification{oldObj:old,

newObj:d.Object},isSync)

}else{

if err:=s.indexer.Add(d.Object);err!=nil{returnerr

 

false)

}

s.processor.distribute(addNotification{newObj:d.Object},

 

}

caseDeleted:

iferr:=s.indexer.Delete(d.Object);err!=nil{


 

returnerr

}

s.processor.distribute(deleteNotification{oldObj:d.Object},false)

}

}

returnnil

}

 

Indexer   是如何实现存储并快速查找资源的呢?我们先看一下 Indexer接口提供的功能。CacheIndexer的一种非常经典的实现,所有的对象缓存在内存中,而且从Cache 这个类型的名称来看它属于包内私有类型,外部无法直接使用,只能通过专用的函数创建。 这里的 Store、Indexer使用了一个 threadSafeMap来保证并发安全的存储。它拥有存储相关的增、删、改、查等方法。threadSafeMap继承了 Store接口,而 Indexer扩展了threadSafeMap, 为 threadSafeMap提供了索引操作。threadSafeMap其实只能够存储和索引。存储即将runtime.object存储到 ItemsMap中;索引即为ItemsMap建立三层索引:IndicesMap类型索引namespace、nodeName);IndexMap 类型索引(namespace1、namespace2……);runtime.object类型索引,实现见代码清 2-47


typeIndexerinterface{Store

//indexName索引类,obj是对象,计算objindexName索引类中的索引键,通过索引键

获取所有的对象

//基本就是获取符合obj特征的所有对象,所谓的特征就是对象在索引类中的索引键

Index(indexNamestring,objinterface{})([]interface{},error)

//indexKeyindexName索引类中的⼀个索引键,函数返回indexKey指定的所有对象键

IndexKeys(indexName,indexedValuestring)([]string,error)

//获取indexName索引类中的所有索引键

ListIndexFuncValues(indexNamestring)[]string

//这个函数和 Index类似,只是返回值不是对象键,⽽是所有对象

ByIndex(indexName,indexedValuestring)([]interface{},error)

//返回Indexers

GetIndexers()Indexers

//添加Indexers,就是增加更多的索引分类

AddIndexers(newIndexersIndexers)error

}

 

Kubernetes中使用的比较多的索引函数是MetaNamespaceIndexFunc() 代码位置:

 

client-go/tools/cache/index.go,Indexer索引的实现是通过index.ByIndex来完成的, index.ByIndex的实现见代码清单 2-48。这个函数返回了符合索引函数的值的对象列表。



func(c*threadSafeMap)ByIndex(indexName,indexKeystring)([]interface{},error){c.lock.RLock()

deferc.lock.RUnlock()

 

indexFunc:=c.indexers[indexName]ifindexFunc==nil{

returnnil,fmt.Errorf("Indexwithname%sdoesnotexist",indexName)

}

index:=c.indices[indexName]set:=index[indexKey]

list:=make([]interface{},0,set.Len())for_,key:=rangeset.List(){

list=append(list,c.items[key])

}

 

returnlist,nil


}

 

上述方法接收两个参数:indexName(索引器的名称)indexedValue需要索引的 Key。首先根据索引器名称查找指定的索引器函数c.indexers[indexName]);然后根据索引器名称查找相应的缓存器函数(c.indices[indexName]) ;最后根据索引 Key

indexedValue)从缓存中进行数据查询,并返回查询结果。

相关文章
|
4月前
|
运维 监控 Cloud Native
【云故事探索】NO.17:国诚投顾的云原生 Serverless 实践
国诚投顾携手阿里云,依托Serverless架构实现技术全面升级,构建高弹性、智能化技术底座,提升业务稳定性与运行效率。通过云原生API网关、微服务治理与智能监控,实现流量精细化管理与系统可观测性增强,打造安全、敏捷的智能投顾平台,助力行业数字化变革。
【云故事探索】NO.17:国诚投顾的云原生 Serverless 实践
|
4月前
|
运维 监控 Cloud Native
【云故事探索】NO.17:国诚投顾的云原生 Serverless 实践
通过与阿里云深度合作,国诚投顾完成了从传统 ECS 架构向云原生 Serverless 架构的全面转型。新的技术架构不仅解决了原有系统在稳定性、弹性、运维效率等方面的痛点,还在成本控制、API 治理、可观测性、DevOps 自动化等方面实现了全方位升级。
|
6月前
|
Kubernetes Cloud Native 安全
云原生机密计算新范式 PeerPods技术方案在阿里云上的落地和实践
PeerPods 技术价值已在阿里云实际场景中深度落地。
|
6月前
|
Kubernetes Cloud Native 安全
云原生机密计算新范式 PeerPods 技术方案在阿里云上的落地和实践
PeerPods 技术价值已在阿里云实际场景中深度落地。
|
2月前
|
人工智能 Cloud Native 算法
拔俗云原生 AI 临床大数据平台:赋能医学科研的开发者实践
AI临床大数据科研平台依托阿里云、腾讯云,打通医疗数据孤岛,提供从数据治理到模型落地的全链路支持。通过联邦学习、弹性算力与安全合规技术,实现跨机构协作与高效训练,助力开发者提升科研效率,推动医学AI创新落地。(238字)
|
4月前
|
弹性计算 运维 Cloud Native
【云故事探索】NO.17:国诚投顾的云原生Serverless实践
简介: 通过与阿里云深度合作,国诚投顾完成了从传统 ECS 架构向云原生 Serverless 架构的全面转型。新的技术架构不仅解决了原有系统在稳定性、弹性、运维效率等方面的痛点,还在成本控制、API 治理、可观测性、DevOps 自动化等方面实现了全方位升级。
156 1
|
3月前
|
存储 弹性计算 Cloud Native
云原生数据库的演进与应用实践
随着企业业务扩展,传统数据库难以应对高并发与弹性需求。云原生数据库应运而生,具备计算存储分离、弹性伸缩、高可用等核心特性,广泛应用于电商、金融、物联网等场景。阿里云PolarDB、Lindorm等产品已形成完善生态,助力企业高效处理数据。未来,AI驱动、Serverless与多云兼容将推动其进一步发展。
209 8
|
8月前
|
运维 Cloud Native 测试技术
极氪汽车云原生架构落地实践
随着极氪数字业务的飞速发展,背后的 IT 技术也在不断更新迭代。极氪极为重视客户对服务的体验,并将系统稳定性、业务功能的迭代效率、问题的快速定位和解决视为构建核心竞争力的基石。
|
5月前
|
Cloud Native 中间件 调度
云原生信息提取系统:容器化流程与CI/CD集成实践
本文介绍如何通过工程化手段解决数据提取任务中的稳定性与部署难题。结合 Scrapy、Docker、代理中间件与 CI/CD 工具,构建可自动运行、持续迭代的云原生信息提取系统,实现结构化数据采集与标准化交付。
182 1
云原生信息提取系统:容器化流程与CI/CD集成实践
|
5月前
|
人工智能 安全 Java
Go与Java泛型原理简介
本文介绍了Go与Java泛型的实现原理。Go通过单态化为不同类型生成函数副本,提升运行效率;而Java则采用类型擦除,将泛型转为Object类型处理,保持兼容性但牺牲部分类型安全。两种机制各有优劣,适用于不同场景。
196 24