Kubernetes 是一个声明式的系统。我们在使用 Kubernetes 管理应用、部署服务时,通常会使用一个 YAML 格式的文件去描述期望应用部署后的最终状态。
当这个文件被提交到 Kubernetes 后,我们神奇地发现 Kubernetes 在不停地创建各种资源,直到达到我们所描述的状态。实现这个功能的组件就是我们今天讨论的 kube-controller-manager,Kubernetes 集群的大脑。
我们平时所见到的 Kubernetes 集群中的节点(Node
)、Pod
、服务(Service
)、端点(Endpoint
)、命名空间(Namespace
)、服务账户(ServiceAccount
)、资源定额(ResourceQuota
) 等资源都是由 kube-controller-manager 管理的。
kube-controller-manager 为了管理这些资源,内置了一些控制器来实现,如 DeploymentControllers
控制器、StatefulSet
控制器、Namespace
控制器及 PersistentVolume
控制器等这些控制器都分别对应着对应各自的资源 Deployment
、StatefulSet
、Namespace
和 PersistentVolume
等。
这些 Controller 使用 Informer 机制 ,从 kube-apiserver 实时监控着某些特定的资源对象,获取它们当前的状态,对它们进行对比、修正、收敛,来使这些对象的状态不断靠近、直至达成在它们的声明语义中所期望的目标状态。
高可用 kube-controller-manager 在 Kubernetes 集群中以多实例运行实现高可用,但多个实例中只有作为 Leader 的实现会执行 Controller 逻辑。非 Leader 节点作为热备节点,只有当 Leader 节点因为某种原因故障后,非 Leader 节点经过选举成为新 Leader 接替执行 Controller 逻辑。
kube-controller-manager 选举的方式就使用了《Kubernetes 核心组件 Leader 选举机制 》所介绍的选举机制。
启动 kube-controller-manager 的入口在 cmd/kube-controller-manager/app/controllermanager.go
文件中,是 cobra.Command
类型的入口。
cmd/kube-controller-manager/app/controllermanager.go:104 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func NewControllerManagerCommand () *cobra.Command { s, err := options.NewKubeControllerManagerOptions() if err != nil { klog.Fatalf("unable to initialize command options: %v" , err) } cmd := &cobra.Command{ Use: "kube-controller-manager" , RunE: func (cmd *cobra.Command, args []string ) error { return Run(c.Complete(), wait.NeverStop) }, } fs := cmd.Flags() return cmd }
这段代码会进入 Run
方法中执行真正的 kube-controller-manager 逻辑,Run
方法中实际执行业务的代码是内部的 run
匿名方法。
cmd/kube-controller-manager/app/controllermanager.go:226 1 2 3 run := func (ctx context.Context, startSATokenController InitFunc, initializersFunc ControllerInitializersFunc) { }
run
代码后面的部分是进行选主处理的,如果没开启选主功能,直接运行 run
。
cmd/kube-controller-manager/app/controllermanager.go:244 1 2 3 4 5 6 if !c.ComponentConfig.Generic.LeaderElection.LeaderElect { ctx, _ := wait.ContextForChannel(stopCh) run(ctx, saTokenControllerInitFunc, NewControllerInitializers) return nil }
如果启用的选主,则会启动协程,使用 client-go 的选主组件 进行选主,并在成为主节点后也运行 run
,主流程使用 <-stopCh
阻塞:
cmd/kube-controller-manager/app/controllermanager.go:280 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 go leaderElectAndRun(c, id, electionChecker, c.ComponentConfig.Generic.LeaderElection.ResourceLock, c.ComponentConfig.Generic.LeaderElection.ResourceName, leaderelection.LeaderCallbacks{ OnStartedLeading: func (ctx context.Context) { initializersFunc := NewControllerInitializers if leaderMigrator != nil { initializersFunc = createInitializersFunc(leaderMigrator.FilterFunc, leadermigration.ControllerNonMigrated) klog.Info("leader migration: starting main controllers." ) } run(ctx, startSATokenController, initializersFunc) }, OnStoppedLeading: func () { klog.ErrorS(nil , "leaderelection lost" ) klog.FlushAndExit(klog.ExitFlushTimeout, 1 ) }, }) <-stopCh
run
方法实现如下:
cmd/kube-controller-manager/app/controllermanager.go:226 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 run := func (ctx context.Context, startSATokenController InitFunc, initializersFunc ControllerInitializersFunc) { controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done()) if err != nil { klog.Fatalf("error building controller context: %v" , err) } controllerInitializers := initializersFunc(controllerContext.LoopMode) if err := StartControllers(ctx, controllerContext, startSATokenController, controllerInitializers, unsecuredMux, healthzHandler); err != nil { klog.Fatalf("error starting controllers: %v" , err) } controllerContext.InformerFactory.Start(stopCh) controllerContext.ObjectOrMetadataInformerFactory.Start(stopCh) close (controllerContext.InformersStarted) <-ctx.Done() }
run
方法使用传入的 initializersFunc
函数获取要启动的 Controller 列表,默认的 initializersFunc
指向的是 NewControllerInitializers
函数,这个函数会构造一个 map[string]InitFunc
类型的 Map,Key 是 Controller 的名字,Value 是 Controller 的启动函数。
StartControllers
方法遍历上面构造的 map[string]InitFunc
,启动所有 Controller。
Informer 的启动是在 Controller 启动完后才启动的,Controller 会在 Informer 同步完后才会真正的启动。如下是 DeploymentController
启动时,会等待 Informer 同步完才执行 worker 逻辑:
pkg/controller/deployment/deployment_controller.go:157 1 2 3 if !cache.WaitForNamedCacheSync("deployment" , ctx.Done(), dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) { return }
启动 Controller 启动的代码很简单:
cmd/kube-controller-manager/app/controllermanager.go:567 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 func StartControllers (ctx context.Context, controllerCtx ControllerContext, startSATokenController InitFunc, controllers map [string ]InitFunc, unsecuredMux *mux.PathRecorderMux, healthzHandler *controllerhealthz.MutableHealthzHandler) error { if startSATokenController != nil { if _, _, err := startSATokenController(ctx, controllerCtx); err != nil { return err } } if controllerCtx.Cloud != nil { controllerCtx.Cloud.Initialize(controllerCtx.ClientBuilder, ctx.Done()) } var controllerChecks []healthz.HealthChecker for controllerName, initFn := range controllers { if !controllerCtx.IsControllerEnabled(controllerName) { klog.Warningf("%q is disabled" , controllerName) continue } time.Sleep(wait.Jitter(controllerCtx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter)) klog.V(1 ).Infof("Starting %q" , controllerName) ctrl, started, err := initFn(ctx, controllerCtx) if err != nil { klog.Errorf("Error starting %q" , controllerName) return err } if !started { klog.Warningf("Skipping %q" , controllerName) continue } controllerChecks = append (controllerChecks, check) klog.Infof("Started %q" , controllerName) } healthzHandler.AddHealthChecker(controllerChecks...) return nil }
在启动 Controller 前,先启动的 SATokenController(实际上是 TokensController
,用来管理 ServiceAccount 的 ServiceAccountToken),因为这个 Controller 会为其它 Controller 构造 Token,如果启动失败则中止启动流程,因为其它 Controller 取不到 Token 没必要启动了。
总结 kube-controller-manager 最主要的逻辑还是管理众多 Kubernetes 资源的 Controller,作为引子本文只讨论了启动相关的内容。感兴趣可以在 cmd/kube-controller-manager/app/controllermanager.go:428
的 NewControllerInitializers()
方法查看 kube-controller-manager 管理的 Controller 列表。
后面的文章会基于最经典的 DeploymentController
源码,看看 Deployment 是怎么实现的。