Kubernetes 集群的大脑 Controller Manager

Kubernetes 集群的大脑 Controller Manager

Kubernetes 是一个声明式的系统。我们在使用 Kubernetes 管理应用、部署服务时,通常会使用一个 YAML 格式的文件去描述期望应用部署后的最终状态。

当这个文件被提交到 Kubernetes 后,我们神奇地发现 Kubernetes 在不停地创建各种资源,直到达到我们所描述的状态。实现这个功能的组件就是我们今天讨论的 kube-controller-manager,Kubernetes 集群的大脑。

我们平时所见到的 Kubernetes 集群中的节点(Node)、Pod、服务(Service)、端点(Endpoint)、命名空间(Namespace)、服务账户(ServiceAccount)、资源定额(ResourceQuota) 等资源都是由 kube-controller-manager 管理的。

kube-controller-manager 为了管理这些资源,内置了一些控制器来实现,如 DeploymentControllers 控制器、StatefulSet 控制器、Namespace 控制器及 PersistentVolume 控制器等这些控制器都分别对应着对应各自的资源 DeploymentStatefulSetNamespacePersistentVolume 等。

这些 Controller 使用 Informer 机制,从 kube-apiserver 实时监控着某些特定的资源对象,获取它们当前的状态,对它们进行对比、修正、收敛,来使这些对象的状态不断靠近、直至达成在它们的声明语义中所期望的目标状态。

高可用

kube-controller-manager 在 Kubernetes 集群中以多实例运行实现高可用,但多个实例中只有作为 Leader 的实现会执行 Controller 逻辑。非 Leader 节点作为热备节点,只有当 Leader 节点因为某种原因故障后,非 Leader 节点经过选举成为新 Leader 接替执行 Controller 逻辑。

kube-controller-manager 选举的方式就使用了《Kubernetes 核心组件 Leader 选举机制》所介绍的选举机制。

启动

kube-controller-manager 的入口在 cmd/kube-controller-manager/app/controllermanager.go 文件中,是 cobra.Command 类型的入口。

cmd/kube-controller-manager/app/controllermanager.go:104
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func NewControllerManagerCommand() *cobra.Command {
s, err := options.NewKubeControllerManagerOptions()
if err != nil {
klog.Fatalf("unable to initialize command options: %v", err)
}

cmd := &cobra.Command{
Use: "kube-controller-manager",
// ...略过
RunE: func(cmd *cobra.Command, args []string) error {
// 略过
return Run(c.Complete(), wait.NeverStop)
},
// ...略过
}

fs := cmd.Flags()
// 命令行参数注册,略过
return cmd
}

这段代码会进入 Run 方法中执行真正的 kube-controller-manager 逻辑,Run 方法中实际执行业务的代码是内部的 run 匿名方法。

cmd/kube-controller-manager/app/controllermanager.go:226
1
2
3
run := func(ctx context.Context, startSATokenController InitFunc, initializersFunc ControllerInitializersFunc) {
// ...
}

run 代码后面的部分是进行选主处理的,如果没开启选主功能,直接运行 run

cmd/kube-controller-manager/app/controllermanager.go:244
1
2
3
4
5
6
// No leader election, run directly
if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
ctx, _ := wait.ContextForChannel(stopCh)
run(ctx, saTokenControllerInitFunc, NewControllerInitializers)
return nil
}

如果启用的选主,则会启动协程,使用 client-go 的选主组件进行选主,并在成为主节点后也运行 run,主流程使用 <-stopCh 阻塞:

cmd/kube-controller-manager/app/controllermanager.go:280
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Start the main lock
go leaderElectAndRun(c, id, electionChecker,
c.ComponentConfig.Generic.LeaderElection.ResourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceName,
leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
initializersFunc := NewControllerInitializers
if leaderMigrator != nil {
// If leader migration is enabled, we should start only non-migrated controllers
// for the main lock.
initializersFunc = createInitializersFunc(leaderMigrator.FilterFunc, leadermigration.ControllerNonMigrated)
klog.Info("leader migration: starting main controllers.")
}
run(ctx, startSATokenController, initializersFunc)
},
OnStoppedLeading: func() {
klog.ErrorS(nil, "leaderelection lost")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
},
})

//...省略
<-stopCh

run 方法实现如下:

cmd/kube-controller-manager/app/controllermanager.go:226
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
run := func(ctx context.Context, startSATokenController InitFunc, initializersFunc ControllerInitializersFunc) {
controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
if err != nil {
klog.Fatalf("error building controller context: %v", err)
}
// 初始化要运行的 Controller
controllerInitializers := initializersFunc(controllerContext.LoopMode)
// 启动所有 Controller
if err := StartControllers(ctx, controllerContext, startSATokenController, controllerInitializers, unsecuredMux, healthzHandler); err != nil {
klog.Fatalf("error starting controllers: %v", err)
}

// 启动 informer
controllerContext.InformerFactory.Start(stopCh)
controllerContext.ObjectOrMetadataInformerFactory.Start(stopCh)
close(controllerContext.InformersStarted)

<-ctx.Done()
}

run 方法使用传入的 initializersFunc 函数获取要启动的 Controller 列表,默认的 initializersFunc 指向的是 NewControllerInitializers 函数,这个函数会构造一个 map[string]InitFunc 类型的 Map,Key 是 Controller 的名字,Value 是 Controller 的启动函数。

StartControllers 方法遍历上面构造的 map[string]InitFunc,启动所有 Controller。

Informer 注意点

Informer 的启动是在 Controller 启动完后才启动的,Controller 会在 Informer 同步完后才会真正的启动。如下是 DeploymentController 启动时,会等待 Informer 同步完才执行 worker 逻辑:

pkg/controller/deployment/deployment_controller.go:157
1
2
3
if !cache.WaitForNamedCacheSync("deployment", ctx.Done(), dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
return
}

启动 Controller

启动的代码很简单:

cmd/kube-controller-manager/app/controllermanager.go:567
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func StartControllers(ctx context.Context, controllerCtx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc,
unsecuredMux *mux.PathRecorderMux, healthzHandler *controllerhealthz.MutableHealthzHandler) error {
// 先启动 SATokenController,因为这个 Controller 会为其它 Controller 构造 Token
// 如果启动失败则中止启动流程
if startSATokenController != nil {
if _, _, err := startSATokenController(ctx, controllerCtx); err != nil {
return err
}
}

// Initialize the cloud provider with a reference to the clientBuilder only after token controller
// has started in case the cloud provider uses the client builder.
if controllerCtx.Cloud != nil {
controllerCtx.Cloud.Initialize(controllerCtx.ClientBuilder, ctx.Done())
}

var controllerChecks []healthz.HealthChecker

// 遍历 Controller 列表
for controllerName, initFn := range controllers {
if !controllerCtx.IsControllerEnabled(controllerName) {
klog.Warningf("%q is disabled", controllerName)
continue
}
// 加点时间间隔
time.Sleep(wait.Jitter(controllerCtx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))

// 启动 Controller
klog.V(1).Infof("Starting %q", controllerName)
ctrl, started, err := initFn(ctx, controllerCtx)
if err != nil {
klog.Errorf("Error starting %q", controllerName)
return err
}
if !started {
klog.Warningf("Skipping %q", controllerName)
continue
}
// ...省略配置 Check 代码
controllerChecks = append(controllerChecks, check)

klog.Infof("Started %q", controllerName)
}

healthzHandler.AddHealthChecker(controllerChecks...)

return nil
}

在启动 Controller 前,先启动的 SATokenController(实际上是 TokensController,用来管理 ServiceAccount 的 ServiceAccountToken),因为这个 Controller 会为其它 Controller 构造 Token,如果启动失败则中止启动流程,因为其它 Controller 取不到 Token 没必要启动了。

总结

kube-controller-manager 最主要的逻辑还是管理众多 Kubernetes 资源的 Controller,作为引子本文只讨论了启动相关的内容。感兴趣可以在 cmd/kube-controller-manager/app/controllermanager.go:428NewControllerInitializers() 方法查看 kube-controller-manager 管理的 Controller 列表。

后面的文章会基于最经典的 DeploymentController 源码,看看 Deployment 是怎么实现的。

作者

Jakes Lee

发布于

2023-10-11

更新于

2023-10-19

许可协议

评论