开发 k8s 管理平台 - k8sailor 13. 使用 k8s informer 订阅集群事件

简介: 开发 k8s 管理平台 - k8sailor 13. 使用 k8s informer 订阅集群事件

开发 k8s 管理平台 - k8sailor 13. 使用 k8s informer 订阅集群事件

原文地址: https://tangx.in/posts/books/k8sailor/chapter02/13-k8s-informer/
tag: https://github.com/tangx/k8sailor/tree/feat/13-k8s-informer

informer-design.jpeg

从应用层面来说, 创建 informer 并启动之后就与 k8s cluster 创建了一个长链接并订阅了 某个资源 Resource 的变化。

至于订阅后得到的数据要怎么用完全取决于订阅者的业务设计。

Shared Informer Factory 共享机制

Informer 又称为 Shared Informer,表明是可以共享使用的,在使用 client-go 写代码时,若同一资源的 Informer 被实例化太多次,每个 Informer 使用一个 Reflector,会运行过多的相同 ListAndWatch(即图中的第一步),太多重复的序列化和反序列化会导致 k8s API Server 负载过重。

而 Shared Informer 通过对同一类资源 Informer 共享一个 Reflector 可以节约很多资源,这通过 map 数据结构即可实现这样一个共享 Informer 机制。

// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    f.lock.Lock()
    defer f.lock.Unlock()

    for informerType, informer := range f.informers {
        if !f.startedInformers[informerType] {
            go informer.Run(stopCh)
            f.startedInformers[informerType] = true
        }
    }
}
  1. informer.Run(stopCh) : informer 在启动的时候需要传入一个 通知停止的 channel stopCh <-chan struct{}。 因此用户是可以 主动 关闭 informer 通道的。
  2. 所有 informer 都是通过 go 协程跑在后台的。

Shared Informer Factory 注册 infomers

上面提到, Shared Informer Factory 一个很重要的作用就是

  1. 管理注册 Informer
  2. 防止相同类型的 Infomer 重复注册
// WithEventHandlers 注册 handler
func (inf *Informer) WithEventHandlers(handlers ...InformerEventHandler) *Informer {
    for _, handler := range handlers {
        kind := handler.InformerKind()
        switch kind {
        case "deployment":
            inf.factory.Apps().V1().Deployments().Informer().AddEventHandler(handler)
        case "pod":
            inf.factory.Core().V1().Pods().Informer().AddEventHandler(handler)
        }
    }

    return inf
}

informer event handler

所有的 informer handler 满足接口

type ResourceEventHandler interface {
    OnAdd(obj interface{})
    OnUpdate(oldObj, newObj interface{})
    OnDelete(obj interface{})
}

即处理 OnAdd, OnUpdate, OnDelete 三种事件。

项目案例

例如在本项目中

k8sailor-with-informer.png

在本地创建了一个名为 k8scache 的存储空间, 使用 k8s informer 订阅了 Deployment 的数据并保存到了 本地 并对外提供 deployment 的查询功能。

由于项目本地都存的是数据副本,只提供了 查询 namespace 下的所有数据根据 name 查询某个 deployment 这样简单的功能。 原本 k8s 通过 label 查询 这样 好用 功能, 目前也就无法再提供了。

自此, biz 代码逻辑中与 Deployment 相关的查询使用的数据源都修改为 k8scache

// GetDeploymentByName 通过名称获取 deployment
func GetDeploymentByName(ctx context.Context, input GetDeploymentByNameInput) (*Deployment, error) {

    /* k8s api 返回的数据 */
    // v1dep, err := k8sdao.GetDeploymentByName(ctx, input.Namespace, input.Name)

    /* 使用本地的 k8scache */
    v1dep, err := k8scache.DepTank.GetDeploymentByName(ctx, input.Namespace, input.Name)
    if err != nil {
        return nil, err
    }

    dep := extractDeployment(*v1dep)
    return dep, nil
}

informer 启动

上面已经说了, 使用 informer 创建的是一个独立 应用/模块, 可以作为一个 模块存在于应用内部, 也可以作为一个 独立的应用

无论是怎么定义的, informer 的启动和关闭都 必须要 独立控制。

本项目中是作为一个模块, 在启动 http server 之前进行启动。

var cmdHttpserver = &cobra.Command{
    Use:  "httpserver",
    Long: "启动 web 服务器",
    Run: func(cmd *cobra.Command, args []string) {
        // 启动 informer
        runInformer()

        // 启动服务
        runHttpserver()
    },
}

func runInformer() {

    clientset := global.KubeClient.Client()
    informer := global.KubeInformer.WithClientset(clientset)

    k8scache.RegisterHandlers(informer)

    informer.Start()
}

分层后代码实现的问题

其实上述 切换数据源 的实现是有问题的。

数据源的切换不应该是在 biz 中完成, 而是应该在 k8sdao 中进行。

  1. 从逻辑上来说 k8scachek8s 是一层的, 都是数据的提供者。
  2. DAO 的含义是 数据访问对象 data access object, 其职责就是对 来自不同数据源的相同数据 执行 适合自身业务逻辑的抽象和封装

扩展阅读

深入理解 k8s informer: https://cloudnative.to/blog/client-go-informer-source-code/
相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
1天前
|
存储 Kubernetes 关系型数据库
阿里云ACK备份中心,K8s集群业务应用数据的一站式灾备方案
本文源自2024云栖大会苏雅诗的演讲,探讨了K8s集群业务为何需要灾备及其重要性。文中强调了集群与业务高可用配置对稳定性的重要性,并指出人为误操作等风险,建议实施周期性和特定情况下的灾备措施。针对容器化业务,提出了灾备的新特性与需求,包括工作负载为核心、云资源信息的备份,以及有状态应用的数据保护。介绍了ACK推出的备份中心解决方案,支持命名空间、标签、资源类型等维度的备份,并具备存储卷数据保护功能,能够满足GitOps流程企业的特定需求。此外,还详细描述了备份中心的使用流程、控制台展示、灾备难点及解决方案等内容,展示了备份中心如何有效应对K8s集群资源和存储卷数据的灾备挑战。
|
22天前
|
Kubernetes 监控 Cloud Native
Kubernetes集群的高可用性与伸缩性实践
Kubernetes集群的高可用性与伸缩性实践
56 1
|
2月前
|
JSON Kubernetes 容灾
ACK One应用分发上线:高效管理多集群应用
ACK One应用分发上线,主要介绍了新能力的使用场景
|
1月前
|
存储 运维 Kubernetes
云端迁移:备份中心助力企业跨云迁移K8s容器服务平台
本文将简要介绍阿里云容器服务ACK的备份中心,并以某科技公司在其实际的迁移过程中遇到具体挑战为例,阐述如何有效地利用备份中心来助力企业的容器服务平台迁移项目。
|
2月前
|
Kubernetes 持续交付 开发工具
ACK One GitOps:ApplicationSet UI简化多集群GitOps应用管理
ACK One GitOps新发布了多集群应用控制台,支持管理Argo CD ApplicationSet,提升大规模应用和集群的多集群GitOps应用分发管理体验。
|
2月前
|
Kubernetes Ubuntu Linux
Centos7 搭建 kubernetes集群
本文介绍了如何搭建一个三节点的Kubernetes集群,包括一个主节点和两个工作节点。各节点运行CentOS 7系统,最低配置为2核CPU、2GB内存和15GB硬盘。详细步骤包括环境配置、安装Docker、关闭防火墙和SELinux、禁用交换分区、安装kubeadm、kubelet、kubectl,以及初始化Kubernetes集群和安装网络插件Calico或Flannel。
194 4
|
2月前
|
Kubernetes Cloud Native 云计算
云原生之旅:Kubernetes 集群的搭建与实践
【8月更文挑战第67天】在云原生技术日益成为IT行业焦点的今天,掌握Kubernetes已成为每个软件工程师必备的技能。本文将通过浅显易懂的语言和实际代码示例,引导你从零开始搭建一个Kubernetes集群,并探索其核心概念。无论你是初学者还是希望巩固知识的开发者,这篇文章都将为你打开一扇通往云原生世界的大门。
131 17
|
2月前
|
Kubernetes 应用服务中间件 nginx
搭建Kubernetes v1.31.1服务器集群,采用Calico网络技术
在阿里云服务器上部署k8s集群,一、3台k8s服务器,1个Master节点,2个工作节点,采用Calico网络技术。二、部署nginx服务到k8s集群,并验证nginx服务运行状态。
798 1
|
2月前
|
Kubernetes Cloud Native 流计算
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
80 3
|
2月前
|
Kubernetes Docker 微服务
微服务实践k8s&dapr开发部署实验(1)服务调用(一)
微服务实践k8s&dapr开发部署实验(1)服务调用(一)
49 2