【K8s源码品读】012:Phase 1 - kube-controller-manager - 了解控制管理中心

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: 理解 kube-controller-manager 的运行机制

聚焦目标

理解 kube-controller-manager 的运行机制

目录

  1. 运行的主函数
  2. 控制器的启动函数
  3. 引入概念ReplicaSet
  4. 查看ReplicaSetController
  5. ReplicaSet的核心实现函数
  6. 总结

Run

我们找到了对应的主函数,看看其中的内容

func Run(c *config.CompletedConfig, stopCh <-chan struct{
   }) error {
   
    // configz 模块,在kube-scheduler分析中已经了解
    if cfgz, err := configz.New(ConfigzName); err == nil {
   
        cfgz.Set(c.ComponentConfig)
    } else {
   
        klog.Errorf("unable to register configz: %v", err)
    }

    // 健康监测与http服务,跳过
    var checks []healthz.HealthChecker
    var unsecuredMux *mux.PathRecorderMux

    run := func(ctx context.Context) {
   
        rootClientBuilder := controller.SimpleControllerClientBuilder{
   
            ClientConfig: c.Kubeconfig,
        }

    // client认证相关
        var clientBuilder controller.ControllerClientBuilder

    // 创建controller的上下文context
        controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
        if err != nil {
   
            klog.Fatalf("error building controller context: %v", err)
        }
        saTokenControllerInitFunc := serviceAccountTokenControllerStarter{
   rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController

        if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
   
            klog.Fatalf("error starting controllers: %v", err)
        }

    // 这里的 InformerFactory 和我们在kube-scheduler中看的 SharedInformerFactory 基本一致
        controllerContext.InformerFactory.Start(controllerContext.Stop)
        controllerContext.ObjectOrMetadataInformerFactory.Start(controllerContext.Stop)
        close(controllerContext.InformersStarted)

        select {
   }
    }

  // 是否进行选举
    if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
   
        run(context.TODO())
        panic("unreachable")
    }

  // 拼接出一个全局唯一的id
    id, err := os.Hostname()
    if err != nil {
   
        return err
    }
    id = id + "_" + string(uuid.NewUUID())

    rl, err := resourcelock.New(c.ComponentConfig.Generic.LeaderElection.ResourceLock,
        c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
        c.ComponentConfig.Generic.LeaderElection.ResourceName,
        c.LeaderElectionClient.CoreV1(),
        c.LeaderElectionClient.CoordinationV1(),
        resourcelock.ResourceLockConfig{
   
            Identity:      id,
            EventRecorder: c.EventRecorder,
        })
    if err != nil {
   
        klog.Fatalf("error creating lock: %v", err)
    }

  // 正常情况下都是阻塞在RunOrDie这个函数中,不停地进行选举相关的工作
    leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
   
        Lock:          rl,
        LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
        RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
        RetryPeriod:   c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
        Callbacks: leaderelection.LeaderCallbacks{
   
      // 开始成为Leader的时候,调用run函数
            OnStartedLeading: run,
            OnStoppedLeading: func() {
   
                klog.Fatalf("leaderelection lost")
            },
        },
        WatchDog: electionChecker,
        Name:     "kube-controller-manager",
    })
    panic("unreachable")
}

StartControllers

func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error {
   
    // 关键性的循环,启动每个controllers,key为控制器名字,value为初始化函数
    for controllerName, initFn := range controllers {
   
    // 是否允许启动
        if !ctx.IsControllerEnabled(controllerName) {
   
            klog.Warningf("%q is disabled", controllerName)
            continue
        }
        time.Sleep(wait.Jitter(ctx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
        klog.V(1).Infof("Starting %q", controllerName)
    // 调用init函数进行启动
        debugHandler, started, err := initFn(ctx)
        if err != nil {
   
            klog.Errorf("Error starting %q", controllerName)
            return err
        }
        if !started {
   
            klog.Warningf("Skipping %q", controllerName)
            continue
        }
    // 注册对应controller到debug的url中
        if debugHandler != nil && unsecuredMux != nil {
   
            basePath := "/debug/controllers/" + controllerName
            unsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler))
            unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler))
        }
        klog.Infof("Started %q", controllerName)
    }

    return nil
}

// 我们再去传入controller的函数去看看,对应的controller有哪些,这里有我们很多常见的概念,今天不一一细讲
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
   
    controllers := map[string]InitFunc{
   }
    controllers["endpoint"] = startEndpointController
    controllers["endpointslice"] = startEndpointSliceController
    controllers["endpointslicemirroring"] = startEndpointSliceMirroringController
    controllers["replicationcontroller"] = startReplicationController
    controllers["podgc"] = startPodGCController
    controllers["resourcequota"] = startResourceQuotaController
    controllers["namespace"] = startNamespaceController
    controllers["serviceaccount"] = startServiceAccountController
    controllers["garbagecollector"] = startGarbageCollectorController
    controllers["daemonset"] = startDaemonSetController
    controllers["job"] = startJobController
    controllers["deployment"] = startDeploymentController
    controllers["replicaset"] = startReplicaSetController
    controllers["horizontalpodautoscaling"] = startHPAController
    controllers["disruption"] = startDisruptionController
    controllers["statefulset"] = startStatefulSetController
    controllers["cronjob"] = startCronJobController
    controllers["csrsigning"] = startCSRSigningController
    controllers["csrapproving"] = startCSRApprovingController
    controllers["csrcleaner"] = startCSRCleanerController
    controllers["ttl"] = startTTLController
    controllers["bootstrapsigner"] = startBootstrapSignerController
    controllers["tokencleaner"] = startTokenCleanerController
    controllers["nodeipam"] = startNodeIpamController
    controllers["nodelifecycle"] = startNodeLifecycleController
    if loopMode == IncludeCloudLoops {
   
        controllers["service"] = startServiceController
        controllers["route"] = startRouteController
        controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
    }
    controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
    controllers["attachdetach"] = startAttachDetachController
    controllers["persistentvolume-expander"] = startVolumeExpandController
    controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
    controllers["pvc-protection"] = startPVCProtectionController
    controllers["pv-protection"] = startPVProtectionController
    controllers["ttl-after-finished"] = startTTLAfterFinishedController
    controllers["root-ca-cert-publisher"] = startRootCACertPublisher
    controllers["ephemeral-volume"] = startEphemeralVolumeController

    return controllers
}

ReplicaSet

由于我们的示例是创建一个nginx的pod,涉及到kube-controller-manager的内容很少。

但是,为了加深大家对 kube-controller-manager 的认识,我们引入一个新的概念 - ReplicaSet,下面是官方说明:

A ReplicaSet's purpose is to maintain a stable set of replica Pods running at any given time. As such, it is often used to guarantee the availability of a specified number of identical Pods.

ReplicaSet 的目的是维护一组在任何时候都处于运行状态的 Pod 副本的稳定集合。 因此,它通常用来保证给定数量的、完全相同的 Pod 的可用性。

简单来说,ReplicaSet 就是用来生成指定个数的Pod。

ReplicaSetController

func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
   
    if !ctx.AvailableResources[schema.GroupVersionResource{
   Group: "apps", Version: "v1", Resource: "replicasets"}] {
   
        return nil, false, nil
    }

  // 用goroutine异步运行,包含了 ReplicaSet和Pod 的两个Informer
  // 这一点很好理解:我们是要控制ReplicaSet声明的数量和运行的Pod数量一致,需要同时观察者两种资源
    go replicaset.NewReplicaSetController(
        ctx.InformerFactory.Apps().V1().ReplicaSets(),
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
        replicaset.BurstReplicas,
    ).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
    return nil, true, nil
}

// 运行函数
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{
   }) {
   
    defer utilruntime.HandleCrash()
    defer rsc.queue.ShutDown()

    controllerName := strings.ToLower(rsc.Kind)
    klog.Infof("Starting %v controller", controllerName)
    defer klog.Infof("Shutting down %v controller", controllerName)

    if !cache.WaitForNamedCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
   
        return
    }

    for i := 0; i < workers; i++ {
   
    // 工作的函数
        go wait.Until(rsc.worker, time.Second, stopCh)
    }

    <-stopCh
}

func (rsc *ReplicaSetController) worker() {
   
  // 继续查找实现
    for rsc.processNextWorkItem() {
   
    }
}

func (rsc *ReplicaSetController) processNextWorkItem() bool {
   
  // 这里也有个queue的概念,可以类比kube-scheduler中的实现
  // 不同的是,这里的queue是 workqueue.RateLimitingInterface ,也就是限制速率的,具体实现今天不细看

  // 获取元素
    key, quit := rsc.queue.Get()
    if quit {
   
        return false
    }
    defer rsc.queue.Done(key)

  // 处理对应的元素
    err := rsc.syncHandler(key.(string))
    if err == nil {
   
        rsc.queue.Forget(key)
        return true
    }

    utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))
    rsc.queue.AddRateLimited(key)

    return true
}

// 再回过头,去查看syncHandler的具体实现
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
    gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
   

    rsc.syncHandler = rsc.syncReplicaSet

    return rsc
}

syncReplicaSet

func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
   
    startTime := time.Now()
    defer func() {
   
        klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
    }()

    // 从key中拆分出 namespace 和 name
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
   
        return err
    }

  // 根据name,从 Lister 获取对应的 ReplicaSets 信息
    rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
    if errors.IsNotFound(err) {
   
        klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key)
        rsc.expectations.DeleteExpectations(key)
        return nil
    }
    if err != nil {
   
        return err
    }

    rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
  // 获取 selector (k8s 是根据selector中的label来匹配 ReplicaSets 和 Pod 的)
    selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
    if err != nil {
   
        utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err))
        return nil
    }

    // 根据namespace和labels获取所有的pod
    allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
    if err != nil {
   
        return err
    }

  // 过滤无效的pod
    filteredPods := controller.FilterActivePods(allPods)

    // 根据selector再过滤pod
    filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
    if err != nil {
   
        return err
    }

    var manageReplicasErr error
    if rsNeedsSync && rs.DeletionTimestamp == nil {
   
    // 管理 ReplicaSet,下面详细分析
        manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
    }
    rs = rs.DeepCopy()
    newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)

    // 更新状态
    updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
    if err != nil {
   
        return err
    }
    if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
        updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
        updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
   
        rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
    }
    return manageReplicasErr
}

// 我们再一起看看,当Pod数量和ReplicaSet中声明的不同时,是怎么工作的
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
   
    // diff = 当前pod数 - 期望pod数
  diff := len(filteredPods) - int(*(rs.Spec.Replicas))
    rsKey, err := controller.KeyFunc(rs)
    if err != nil {
   
        utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
        return nil
    }

  // diff小于0,表示需要扩容,即新增Pod
    if diff < 0 {
   

    // 具体的实现暂时不细看

  // diff 大于0,即需要缩容
    } else if diff > 0 {
   

    }

    return nil
}

Summary

kube-controller-manager 的核心思想是: 根据期望状态当前状态,管理Kubernetes中的资源。

以ReplicaSet为例,它对比了定义声明的Pod数当前集群中满足条件的Pod数,进行相对应的扩缩容。

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
目录
相关文章
|
Kubernetes 算法 调度
|
存储 Kubernetes 安全
【K8s源码品读】010:Phase 1 - kube-scheduler - Informer是如何保存数据的
了解Informer在发现资源变化后,是怎么处理的
55 0
|
Kubernetes 容器
【K8s源码品读】009:Phase 1 - kube-scheduler - Informer监听资源变化
了解Informer是如何从kube-apiserver监听资源变化的情况
103 0
|
10天前
|
JSON Kubernetes 容灾
ACK One应用分发上线:高效管理多集群应用
ACK One应用分发上线,主要介绍了新能力的使用场景
|
11天前
|
Kubernetes 持续交付 开发工具
ACK One GitOps:ApplicationSet UI简化多集群GitOps应用管理
ACK One GitOps新发布了多集群应用控制台,支持管理Argo CD ApplicationSet,提升大规模应用和集群的多集群GitOps应用分发管理体验。
|
1月前
|
Kubernetes Cloud Native 云计算
云原生之旅:Kubernetes 集群的搭建与实践
【8月更文挑战第67天】在云原生技术日益成为IT行业焦点的今天,掌握Kubernetes已成为每个软件工程师必备的技能。本文将通过浅显易懂的语言和实际代码示例,引导你从零开始搭建一个Kubernetes集群,并探索其核心概念。无论你是初学者还是希望巩固知识的开发者,这篇文章都将为你打开一扇通往云原生世界的大门。
102 17
|
26天前
|
Kubernetes 应用服务中间件 nginx
搭建Kubernetes v1.31.1服务器集群,采用Calico网络技术
在阿里云服务器上部署k8s集群,一、3台k8s服务器,1个Master节点,2个工作节点,采用Calico网络技术。二、部署nginx服务到k8s集群,并验证nginx服务运行状态。
294 1
|
1月前
|
Kubernetes Cloud Native 微服务
微服务实践之使用 kube-vip 搭建高可用 Kubernetes 集群
微服务实践之使用 kube-vip 搭建高可用 Kubernetes 集群
86 1

推荐镜像

更多