带你读《云原生应用开发 Operator原理与实践》第二章 Operator 原理2.2Client-go 原理(十一)

简介: 带你读《云原生应用开发 Operator原理与实践》第二章 Operator 原理2.2Client-go 原理(十一)

Watch操作通过 HTTPKubernetesAPIServer建立长链接, 接收 KubernetesAPIServer发来的变更时间,Watch操作的实现机制使用的是 HTTP的分块传输编码。Client-go调 用 KubernetesAPIServer 时, 在 ResponseHTTPHeader 中 设 置Transfer-Encoding的值为 Chunked。r.listerWatcher.Watch实际调用了 PodInformerwatchfunc函数。通过 ClientSet客户端与 APIServer 建立长链接,监控指定资源的变更事件。r.watchHandler用于处理资源的变更时间,当初发增删改AddedUpdated等事件时,将对应的资源对象更新到本地缓存 DeltaFIFO中,并更新 ResouceVersion。至此实现了 Reflctor组件的功能。

 

1. Client-goDeltaFIFO


DeltaFIFO是一个 FIFO队列,记录了资源对象的变化过程。作为一个 FIFO队列,它的生产者就是 Reflector组件,前面讲过 Reflector将监听对象同步到 DeltaFIFO中,DeltaFIFO对这些资源对象做了什么,见代码清单2-40。

typeDeltaFIFOstruct{

locksync.RWMutex

condsync.Cond//条件变量,唤醒等待的协程

itemsmap[string]Deltas//Delta存储桶

queue[]string//队列存储对象键实际就是和items⼀起形成了⼀个有序Map

//true通过Replace() 第⼀批元素被插⼊队列或者Delete/Add/Update⾸次被调⽤

populatedbool

//Replace() 被⾸次调⽤时插⼊的元素数⽬

initialPopulationCountint

 

//函数计算元素Key

keyFuncKeyFunc

 

//列出已知的对象

knownObjectsKeyListerGetter

//队列是否被关闭,关闭互斥锁

closedboolclosedLocksync.Mutex


}

FIFO接收 ReflectorAdds/Updates 添加和更新事件,并将它们按照顺序放入队列。元素在队列中被处理之前,如果有多个Adds/Updates 事件发生,事件只会被处理一次。

使用场景1仅处理对象一次2处理完当前事件后才能处理最新版本的对象;3删除对象之后不会处理4不能周期性重新处理对象。这里的Delta对象就Kubernetes系统中对象的变化。DeltaTypeObject两个属性DeltaType就是资源变化的类型, 比如 AddUpdateDeltaObject就是具体的 Kubernetes资源对象,见代码清单 2-41。例如,此时 Reflector中监听了一个PodAAdd事件,那么此DeltaType就是AddedDeltaObject就是 PodADeltaFIFO中的数据是什么样的呢?此时 Items中会有 Add类型的 Delta,Queue中也会有这个事件的 Key。这个 KeyKeyFunc生成。Client-go中默认的 KeyFuncMetaNamespaceKeyFunc,可以在tools/cache/store.go:76中找到。由 MetaNamespaceKeyFunc生成的 Key格式为/,用来标识不同命名空间下的不同资源。

typeDeltastruct{

Type    DeltaType                   //Delta类型,⽐如增、减,后⾯有详细说明

Objectinterface{}                  //对象,Delta的粒度是⼀个对象

}

typeDeltaTypestring                  //Delta的类型⽤字符串表达

const(

AddedDeltaType= "Added" //增加UpdatedDeltaType= "Updated" //更新DeletedDeltaType= "Deleted" //删除SyncDeltaType= "Sync" //同步

)

typeDeltas[]Delta                    //Delta数组

 


既然 DeltaFIFO是一个 FIFO,那么它就应该有基本的 FIFO功能,这里 DeltaFIFO实现了 Queue接口。下面看一下 Queue接口功能的定义。我们可以看出 Queue扩展了Store接口的功能,附加了 Pop、AddIfNotPresent、HasSynced、Close方法。Store是一个通用的对象存储和处理的接口,本身提供了 Add、Update、List、Get等方法,Queue接口增加了 Pop方法,实现了一个基本 FIFO队列,具体见代码清单 2-42。

typeQueueinterface{Store

Pop(PopProcessFunc)(interface{},error)AddIfNotPresent(interface{})errorHasSynced()bool

Close()

}

 

下面我们来看一下FIFO队列的基本功能是怎么实现的。首先是 Add方法,我们可以看到 Add方法会先根据 KeyFunc计算出对象的 Key,如果队列中没有这个对象,我们就在这个队列尾部增补对象的Key,并且将这个对象存入 Map,具体见代码清2-43

func(f*FIFO)Add(objinterface{})error{id,err:=f.keyFunc(obj)

iferr!=nil{

returnKeyError{obj,err}

}

f.lock.Lock()

deferf.lock.Unlock()f.populated=true

if _,exists:=f.items[id];!exists{f.queue=append(f.queue,id)

}

f.items[id]=objf.cond.Broadcast()returnnil


}

 

接下来我们看一下 Pop方法, 在 Queue中至少有一个资源时才会进行 Pop作。在处理资源之前,资源会从队列(和存储)中移除,如果未成功处理资源,应该用AddIfNotPresent()函数将资源添加回队列。处理逻辑由 PopProcessFunc进行执行,具体见代码清单2-44。

func(f*DeltaFIFO)Pop(processPopProcessFunc)(interface{},error){

f.lock.Lock()

deferf.lock.Unlock()for{

forlen(f.queue)==0{

//Whenthequeueisempty,invocationofPop()isblockeduntilnewitemisenqueued.

//WhenClose()iscalled,thef.closedissetandtheconditionisbroadcasted.

//Whichcausesthislooptocontinueandreturnfromthe

Pop().


iff.IsClosed(){

returnnil,FIFOClosedError


}

 

f.cond.Wait()

}

id:=f.queue[0]

 

f.queue=f.queue[1:]item,ok:=f.items[id]

iff.initialPopulationCount>0{f.initialPopulationCount--

}

if!ok{

//Itemmayhavebeendeletedsubsequently.continue

}

delete(f.items,id)err:=process(item)

if e,ok:=err.(ErrRequeue);ok{f.addIfNotPresent(id,item)err=e.Err

}

//Don'tneedtocopyDeltashere,becausewe'retransferring

//ownershiptothecaller.returnitem,err

}

}

 

func(f*DeltaFIFO)KeyOf(objinterface{})(string,error)

if d,ok:=obj.(Deltas);ok{iflen(d)==0{

return"",KeyError{obj,ErrZeroLengthDeltasObject}

}

obj=d.Newest().Object

}

ifd,ok:=obj.(DeletedFinalStateUnknown);ok{

returnd.Key,nil

}

returnf.keyFunc(obj)

}

 

值得注意的是DeltaFIFO中用于计算对象键的函数KeyOf为什么要先进行一次Deltas的类型转换呢?是因为Pop 出去的对象很可能还要再添加进来(比如处理失败需要再放进来,此时添加的对象就是已经封装好的Delta对象了。至此,已实现DeltaFIFO基本功能。

相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。     相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
相关文章
|
6月前
|
运维 监控 Cloud Native
【云故事探索】NO.17:国诚投顾的云原生 Serverless 实践
国诚投顾携手阿里云,依托Serverless架构实现技术全面升级,构建高弹性、智能化技术底座,提升业务稳定性与运行效率。通过云原生API网关、微服务治理与智能监控,实现流量精细化管理与系统可观测性增强,打造安全、敏捷的智能投顾平台,助力行业数字化变革。
【云故事探索】NO.17:国诚投顾的云原生 Serverless 实践
|
8月前
|
Kubernetes Cloud Native 安全
云原生机密计算新范式 PeerPods技术方案在阿里云上的落地和实践
PeerPods 技术价值已在阿里云实际场景中深度落地。
|
6月前
|
运维 监控 Cloud Native
【云故事探索】NO.17:国诚投顾的云原生 Serverless 实践
通过与阿里云深度合作,国诚投顾完成了从传统 ECS 架构向云原生 Serverless 架构的全面转型。新的技术架构不仅解决了原有系统在稳定性、弹性、运维效率等方面的痛点,还在成本控制、API 治理、可观测性、DevOps 自动化等方面实现了全方位升级。
|
8月前
|
Kubernetes Cloud Native 安全
云原生机密计算新范式 PeerPods 技术方案在阿里云上的落地和实践
PeerPods 技术价值已在阿里云实际场景中深度落地。
|
4月前
|
人工智能 Cloud Native 算法
拔俗云原生 AI 临床大数据平台:赋能医学科研的开发者实践
AI临床大数据科研平台依托阿里云、腾讯云,打通医疗数据孤岛,提供从数据治理到模型落地的全链路支持。通过联邦学习、弹性算力与安全合规技术,实现跨机构协作与高效训练,助力开发者提升科研效率,推动医学AI创新落地。(238字)
310 7
|
6月前
|
弹性计算 运维 Cloud Native
【云故事探索】NO.17:国诚投顾的云原生Serverless实践
简介: 通过与阿里云深度合作,国诚投顾完成了从传统 ECS 架构向云原生 Serverless 架构的全面转型。新的技术架构不仅解决了原有系统在稳定性、弹性、运维效率等方面的痛点,还在成本控制、API 治理、可观测性、DevOps 自动化等方面实现了全方位升级。
178 1
|
5月前
|
存储 弹性计算 Cloud Native
云原生数据库的演进与应用实践
随着企业业务扩展,传统数据库难以应对高并发与弹性需求。云原生数据库应运而生,具备计算存储分离、弹性伸缩、高可用等核心特性,广泛应用于电商、金融、物联网等场景。阿里云PolarDB、Lindorm等产品已形成完善生态,助力企业高效处理数据。未来,AI驱动、Serverless与多云兼容将推动其进一步发展。
271 8
|
7月前
|
Cloud Native 中间件 调度
云原生信息提取系统:容器化流程与CI/CD集成实践
本文介绍如何通过工程化手段解决数据提取任务中的稳定性与部署难题。结合 Scrapy、Docker、代理中间件与 CI/CD 工具,构建可自动运行、持续迭代的云原生信息提取系统,实现结构化数据采集与标准化交付。
379 1
云原生信息提取系统:容器化流程与CI/CD集成实践
|
7月前
|
人工智能 安全 Java
Go与Java泛型原理简介
本文介绍了Go与Java泛型的实现原理。Go通过单态化为不同类型生成函数副本,提升运行效率;而Java则采用类型擦除,将泛型转为Object类型处理,保持兼容性但牺牲部分类型安全。两种机制各有优劣,适用于不同场景。
276 24