带你读《云原生应用开发 Operator原理与实践》第二章 Operator 原理2.2Client-go 原理(十一)

简介: 带你读《云原生应用开发 Operator原理与实践》第二章 Operator 原理2.2Client-go 原理(十一)

2.2.6        Client-go Informer 解析

 

1. Client-goInformer模块

 

Informer可以对 KubernetesAPIServer的资源执行 Watch操作 , 类型可以是Kubernetes内置资源,也可以是 CRD。其中最核心的模块是 Reflector、DeltaFIFO、Indexer。接下来我们逐个进行分析。


首先分析   Reflector,Reflector   用于监控指定资源的    Kubernetes。当资源发生变化时,如发生了资源添加(Added、资源更新Updated等事件,Reflector会将其资源对象存放在本地缓存   DeltaFIFO   中。它的作用就是获取 APIServer中对象数据并实时地更新到本地,使得本地数据和ETCD数据完全一样。它的数据结构见代码清单2-37

typeReflectorstruct{

namestring//这个 Reflector的名称,默认为⽂件 : ⾏数metrics*reflectorMetrics//⽤于保存 Reflector的⼀些监控指标expectedTypereflect.Type//期望放到 Store中的类型名称storeStore//Watch源同步的⽬标Store

listerWatcherListerWatcher//ListerWatcher接⼝,⽤于指定 List-Watch⽅法

period           time.Duration//Watch周期resyncPeriodtime.Duration//重新同步周期ShouldResyncfunc() bool

//clockallowsteststomanipulatetimeclockclock.Clock

lastSyncResourceVersionstring//最后同步的资源的版本号

lastSyncResourceVersionMutexsync.RWMutex//lastSyncResourceVersion的读写锁


}

 

通过 NewRefector实例化 Reflector对象,实例化过程中必须传入 ListerWatcher据接口对象,它拥有ListWatch方法,用于获取及监控资源列表,只要是实现了 ListWatch方法的对象都可以成为 ListerWatcher,Reflector对象通过 run函数启动监控并处理事件,而在 Reflector源码实现中最主要的是List-Watch函数,它负责 List/Watch指定的 KubernetesAPIServer资源,见代码清单 2-38。

//NewNamedReflectorsameasNewReflector,butwithaspecifiednameforloggingfunc NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},storeStore,resyncPeriodtime.Duration)*Reflector{

reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1)r:=&Reflector{

name:name,

//weneedthistobeuniqueperprocess(somenamesarestillthesame)butobviouswhoitbelongsto

metrics:        newReflectorMetrics(makeValidPrometheusMetricLabel(fmt.

Sprintf("reflector_"+name+"_%d",reflectorSuffix))),

listerWatcher:lw,store:store,

expectedType:reflect.TypeOf(expectedType),

period:    time.Second,resyncPeriod:resyncPeriod,clock:&clock.RealClock{},

}

returnr

}


List-Watch是怎么实现的? List-Watch主要分为 ListWatch两部分。List责获取对应资源的全量列表,Watch负责获取变化的部分。首先进行 List操作,这里把ResourceVersion设置为 0,因为要获取同步的对象的全部版本,所以从 0开始 List,主要流程如下(见代码清单2-39

(1)r.listerWatcher.List 用于获取资源下的所有对象的数据。

(2)  listMetaInterface.GetResourceVersion   用于获取资源版本号(ResouceVersion,资源版本号非常重要,Kubernetes中所有的资源都拥有该字段,它标识当前资源对象的版本号。每次修改当前资源对象时,KubernetesAPIServer都会更改 ResouceVersion,使Client-go执行 Watch操作时可以根据 ResourceVersion 来确定当前资源对象是否发生过变化。

(3)   meta.ExtractList用于将资源数据转换成资源对象列表,将runtime.Object转换成[]runtime.Object,因为 r.listerWatcher.List只是获取一个列表。

4r.syncWith 用于将资源对象列表中的资源对象和资源版本号存储至DeltaFIFO中,并替换已存在的对象。

(5)r.setLastSyncResourceVersion 用于设置最新的资源版本号。


func(r*Reflector)ListAndWatch(stopCh<-chanstruct{})error{glog.V(3).Infof("Listingandwatching%vfrom%s",r.expectedType,r.name)varresourceVersionstring

options:=metav1.ListOptions{ResourceVersion:"0"}r.metrics.numberOfLists.Inc()

start:=r.clock.Now()

list,err:=r.listerWatcher.List(options)iferr!=nil{

returnfmt.Errorf("%s:Failedtolist%v:%v",r.name,r.expectedType,

err)

}

r.metrics.listDuration.Observe(time.Since(start).Seconds())listMetaInterface,err:=meta.ListAccessor(list)

iferr!=nil{

returnfmt.Errorf("%s:Unabletounderstandlistresult%#v:%v",


r.name,list,err)

}

resourceVersion=listMetaInterface.GetResourceVersion()items,err:=meta.ExtractList(list)

iferr!=nil{

returnfmt.Errorf("%s:Unabletounderstandlistresult%#v(%v)",r.name,list,err)

}

r.metrics.numberOfItemsInList.Observe(float64(len(items)))

iferr:=r.syncWith(items,resourceVersion);err!=nil{

returnfmt.Errorf("%s:Unabletosynclistresult:%v",r.name,err)

}

r.setLastSyncResourceVersion(resourceVersion)

 

resyncerrc:=make(chanerror,1)


cancelCh:=make(chanstruct{})deferclose(cancelCh)

gofunc(){

resyncCh,cleanup:=r.resyncChan()deferfunc(){

cleanup()//Callthelastonewrittenintocleanup

}()

for{

select{

case<-resyncCh:case<-stopCh:

returncase<-cancelCh:

return

}

ifr.ShouldResync==nil||r.ShouldResync(){glog.V(4).Infof("%s:forcingresync",r.name)iferr:=r.store.Resync();err!=nil{

resyncerrc<-errreturn

}

}

}()

 

for{


cleanup()

resyncCh,cleanup=r.resyncChan()

}

//givethestopChachancetostoptheloop,evenincaseofcontinue


statementsfurtherdownonerrorsselect{

case<-stopCh:

returnnildefault:

}

 

timeoutSeconds:=int64(minWatchTimeout.Seconds()* (rand.Float64()+


1.0))


options=metav1.ListOptions{ResourceVersion:resourceVersion,TimeoutSeconds:&timeoutSeconds,


}

 

r.metrics.numberOfWatches.Inc()

w,err:=r.listerWatcher.Watch(options)


 

 

iferr!=nil{

switcherr{

caseio.EOF:

//watchclosednormally

caseio.ErrUnexpectedEOF:

glog.V(1).Infof("%s:Watchfor%vclosedwithunexpected

EOF:%v",r.name, r.expectedType, err)

default:

utilruntime.HandleError(fmt.Errorf("%s:Failedtowatch

%v:%v",r.name,r.expectedType,err))

}

ifurlError,ok:=err.(*url.Error);ok{

ifopError,ok:=urlError.Err.(*net.OpError);ok{

iferrno,ok:=opError.Err.(syscall.Errno);ok&&

errno==syscall.ECONNREFUSED{

time.Sleep(time.Second)continue

}

}

}

returnnil

}

 


 

err!=nil{


iferr:=r.watchHandler(w,&resourceVersion,resyncerrc,stopCh);

 

iferr!=errorStopRequested{

glog.Warningf("%s:watchof%vendedwith:%v",r.name,


r.expectedType,err)

}

returnnil

}

}

}

 

相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
相关文章
|
6月前
|
运维 监控 Cloud Native
【云故事探索】NO.17:国诚投顾的云原生 Serverless 实践
国诚投顾携手阿里云,依托Serverless架构实现技术全面升级,构建高弹性、智能化技术底座,提升业务稳定性与运行效率。通过云原生API网关、微服务治理与智能监控,实现流量精细化管理与系统可观测性增强,打造安全、敏捷的智能投顾平台,助力行业数字化变革。
【云故事探索】NO.17:国诚投顾的云原生 Serverless 实践
|
8月前
|
Kubernetes Cloud Native 安全
云原生机密计算新范式 PeerPods技术方案在阿里云上的落地和实践
PeerPods 技术价值已在阿里云实际场景中深度落地。
|
6月前
|
运维 监控 Cloud Native
【云故事探索】NO.17:国诚投顾的云原生 Serverless 实践
通过与阿里云深度合作,国诚投顾完成了从传统 ECS 架构向云原生 Serverless 架构的全面转型。新的技术架构不仅解决了原有系统在稳定性、弹性、运维效率等方面的痛点,还在成本控制、API 治理、可观测性、DevOps 自动化等方面实现了全方位升级。
|
8月前
|
Kubernetes Cloud Native 安全
云原生机密计算新范式 PeerPods 技术方案在阿里云上的落地和实践
PeerPods 技术价值已在阿里云实际场景中深度落地。
|
4月前
|
人工智能 Cloud Native 算法
拔俗云原生 AI 临床大数据平台:赋能医学科研的开发者实践
AI临床大数据科研平台依托阿里云、腾讯云,打通医疗数据孤岛,提供从数据治理到模型落地的全链路支持。通过联邦学习、弹性算力与安全合规技术,实现跨机构协作与高效训练,助力开发者提升科研效率,推动医学AI创新落地。(238字)
310 7
|
6月前
|
弹性计算 运维 Cloud Native
【云故事探索】NO.17:国诚投顾的云原生Serverless实践
简介: 通过与阿里云深度合作,国诚投顾完成了从传统 ECS 架构向云原生 Serverless 架构的全面转型。新的技术架构不仅解决了原有系统在稳定性、弹性、运维效率等方面的痛点,还在成本控制、API 治理、可观测性、DevOps 自动化等方面实现了全方位升级。
178 1
|
5月前
|
存储 弹性计算 Cloud Native
云原生数据库的演进与应用实践
随着企业业务扩展,传统数据库难以应对高并发与弹性需求。云原生数据库应运而生,具备计算存储分离、弹性伸缩、高可用等核心特性,广泛应用于电商、金融、物联网等场景。阿里云PolarDB、Lindorm等产品已形成完善生态,助力企业高效处理数据。未来,AI驱动、Serverless与多云兼容将推动其进一步发展。
271 8
|
7月前
|
Cloud Native 中间件 调度
云原生信息提取系统:容器化流程与CI/CD集成实践
本文介绍如何通过工程化手段解决数据提取任务中的稳定性与部署难题。结合 Scrapy、Docker、代理中间件与 CI/CD 工具,构建可自动运行、持续迭代的云原生信息提取系统,实现结构化数据采集与标准化交付。
379 1
云原生信息提取系统:容器化流程与CI/CD集成实践
|
7月前
|
人工智能 安全 Java
Go与Java泛型原理简介
本文介绍了Go与Java泛型的实现原理。Go通过单态化为不同类型生成函数副本,提升运行效率;而Java则采用类型擦除,将泛型转为Object类型处理,保持兼容性但牺牲部分类型安全。两种机制各有优劣,适用于不同场景。
276 24

热门文章

最新文章