深入学习 Deployment 实现

Deployment 是 Kubernetes 三个常用工作负载中最常用的。Deployment 用于管理应用的部署情况,可以实现应用的滚动升级和回滚,还能实现应用的扩缩容。

Deployment 通过 ReplicaSet 来管理 Pod。一个完整的 Deployment 创建到 Pod 被拉起的流程由多个控制器协同完成:

Deployment 处理流程

当用户创建 Deployment 时通过 kubectl 等客户端调用 API Server:

  • API Server 对请求进行认证,最终创建一个 Deloyment 对象,此时会产生 Deplyment 创建事件
  • DeploymentController 监听到事件后,创建 ReplicaSet 对象(由 dc.syncDeployment 方法实现),产生 ReplicaSet 创建事件;
  • ReplicaSetController 监听到 ReplicaSet 创建事件,创建 Pod 对象(由 syncReplicaSet 方法实现),产生 Pod 创建事件;
    • 此时 PodSpec.nodeName 为空;
  • scheduler 监听到 Pod 创建事件并对 Spec.nodeName 为空的 Pod 执行调度逻辑,选定节点后更新 PodSpec.nodeName,产生 Pod 更新事件(由 schedulesched.scheduleOne 方法实现);
  • kubelet 监听到 Pod 更新事件,判断 PodSpec.nodeName 是否是当前节点,匹配后按 Pod 的定义启动容器,同时更新 PodStatus(由 KubeletsyncPod 实现)。

Controller 实例的构造

前文《Kubernetes 集群的大脑 Controller Manager》里介绍过 kube-controller-manager 启动内置控制器的方法。而在 NewControllerInitializers 函数可以看到本文的主角 DeploymentController 的启动方法:

cmd/kube-controller-manager/app/controllermanager.go:445
1
2
3
4
5
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
// ...略
register("deployment", startDeploymentController)
// ...略
}

进入里面看看:

cmd/kube-controller-manager/app/apps.go:72
1
2
3
4
5
6
7
8
9
10
11
12
13
func startDeploymentController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
dc, err := deployment.NewDeploymentController(
controllerContext.InformerFactory.Apps().V1().Deployments(),
controllerContext.InformerFactory.Apps().V1().ReplicaSets(),
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.ClientBuilder.ClientOrDie("deployment-controller"),
)
if err != nil {
return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
}
go dc.Run(ctx, int(controllerContext.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs))
return nil, true, nil
}

Deployment 是通过 ReplicaSet 来管理 Pod 的,所以这里会获取这三种资源的 Informer。

然后在协程中启动 DeploymentController.Run 方法,开启 Deployment 资源管理的控制循环。在启动方法里使用了协程,所以在 StartControllers 遍历时没有使用。

pkg/controller/deployment/deployment_controller.go:144
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (dc *DeploymentController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()

// Start events processing pipeline.
dc.eventBroadcaster.StartStructuredLogging(0)
dc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.client.CoreV1().Events("")})
defer dc.eventBroadcaster.Shutdown()

defer dc.queue.ShutDown()

klog.InfoS("Starting controller", "controller", "deployment")
defer klog.InfoS("Shutting down controller", "controller", "deployment")

// 等待本地 deployment、rs 和 pod 缓存与服务器同步
if !cache.WaitForNamedCacheSync("deployment", ctx.Done(), dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
return
}

// 启动多个 worker 执行流程
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, dc.worker, time.Second)
}

<-ctx.Done()
}

workers 的默认值是 5,所以启动 5 个协程来运行 worker

pkg/controller/deployment/deployment_controller.go:461
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (dc *DeploymentController) worker(ctx context.Context) {
for dc.processNextWorkItem(ctx) {
}
}

func (dc *DeploymentController) processNextWorkItem(ctx context.Context) bool {
key, quit := dc.queue.Get()
if quit {
return false
}
defer dc.queue.Done(key)

err := dc.syncHandler(ctx, key.(string))
dc.handleErr(err, key)

return true
}

worker 函数直接无限循环执行 processNextWorkItem 函数。Kubernetes 很多 Controller 都使用这样的格式声明处理 Controller 数据的方法:

1
2
3
func (dc *XXController) processNextWorkItem(ctx context.Context) bool {

}

processNextWorkItem 函数有以下特点:

  • 一次只从 queue 中取出一个 key 来处理;
  • 同一个 key 不会并发调用 syncHandler
  • 处理完后将 key 标记为完成。

queue 组件

queue 中的数据由 Reflector 回调的函数创建。在 NewDeploymentController 中创建 Controller 时注册了事件处理函数:

注册事件处理函数
1
2
3
4
5
6
7
8
9
10
11
func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
// ...略

dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addDeployment,
UpdateFunc: dc.updateDeployment,
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
DeleteFunc: dc.deleteDeployment,
})
// ...略
}

当接收到 Deployment 的创建事件后,由 enqueue 方法实现加入队列 queue

入队逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (dc *DeploymentController) addDeployment(obj interface{}) {
d := obj.(*apps.Deployment)
klog.V(4).InfoS("Adding deployment", "deployment", klog.KObj(d))
dc.enqueueDeployment(d)
}

dc.enqueueDeployment = dc.enqueue

func (dc *DeploymentController) enqueue(deployment *apps.Deployment) {
key, err := controller.KeyFunc(deployment)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
return
}

dc.queue.Add(key)
}

无论是创建、变更还是删除事件,最终都是通过 enqueue 方法将事件加入队列中,然后在主流程中取出处理。

主流程

queue 中存储的事件都是调用的 syncHandler 函数进行处理。syncHandler 是个函数指针,指向了 dc.syncDeployment 函数:

pkg/controller/deployment/deployment_controller.go:569
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) error {
// ...略

// 获取事件的 Deployment
deployment, err := dc.dLister.Deployments(namespace).Get(name)
if errors.IsNotFound(err) {
klog.V(2).InfoS("Deployment has been deleted", "deployment", klog.KRef(namespace, name))
return nil
}
if err != nil {
return err
}

// Deep-copy otherwise we are mutating our cache.
// TODO: Deep-copy only when needed.
d := deployment.DeepCopy()

// 如果 deployment 的 selector 为空,则发布告警事件并返回
everything := metav1.LabelSelector{}
if reflect.DeepEqual(d.Spec.Selector, &everything) {
dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
if d.Status.ObservedGeneration < d.Generation {
d.Status.ObservedGeneration = d.Generation
dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
}
return nil
}

// 获取 Deployment 对应的 ReplicaSet
rsList, err := dc.getReplicaSetsForDeployment(ctx, d)
if err != nil {
return err
}

// 获取 Deployment 的 Pod,使用 ReplicaSet 作为 Key 分组
// 返回值 podMap 的用途:
// 1. 检查 Pod 是否被 pod-template-hash 正确标识
// 2. 检查执行 Recreate Deployment 过程中,是否有旧 Pod 在运行
podMap, err := dc.getPodMapForDeployment(d, rsList)
if err != nil {
return err
}

if d.DeletionTimestamp != nil {
return dc.syncStatusOnly(ctx, d, rsList)
}

// 在暂停或恢复 Deployment 时,使用 Unknown 状态更新 Deployment 状态。
// 这样可以保证有用户通过设置 progressDeadlineSeconds 恢复 Deployment 时不会超时
if err = dc.checkPausedConditions(ctx, d); err != nil {
return err
}

if d.Spec.Paused {
return dc.sync(ctx, d, rsList)
}

// rollback is not re-entrant in case the underlying replica sets are updated with a new
// revision so we should ensure that we won't proceed to update replica sets until we
// make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
if getRollbackTo(d) != nil {
return dc.rollback(ctx, d, rsList)
}

// 检查是否 scaling 事件
scalingEvent, err := dc.isScalingEvent(ctx, d, rsList)
if err != nil {
return err
}
if scalingEvent {
return dc.sync(ctx, d, rsList)
}

switch d.Spec.Strategy.Type {
case apps.RecreateDeploymentStrategyType:
return dc.rolloutRecreate(ctx, d, rsList, podMap)
case apps.RollingUpdateDeploymentStrategyType:
return dc.rolloutRolling(ctx, d, rsList)
}
return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}

syncDeployment 主要执行了以下逻辑:

  • 获取事件的 Deployment
  • 如果 Deploymentselector 为空,则发布告警事件并返回;
  • 获取 Deployment 对应的 ReplicaSet
  • 获取 DeploymentPod,使用 ReplicaSet 作为 Key 分组;
  • 如果当前是在暂停或恢复 Deployment,使用 Unknown 状态更新 Deployment 状态;
  • 如果在回滚状态,进行回滚;
  • 如果是 scaling 事件,执行调整;
  • 判断当前部署策略
    • 滚动更新(RollingUpdateDeploymentStrategyType 默认):执行 rolloutRolling
    • 重建(RecreateDeploymentStrategyType):执行 rolloutRecreate

主流程主要做四件事:

  1. 扩缩容:调用 isScalingEvent() 函数遍历活跃 ReplicaSet,判断是否 desired-replicas 注解与 d.Spec.Replicas 存在差异,有差异表示 Deployment 期望副本数有变化;
  2. 暂停处理;
  3. 回滚;
  4. 更新:d.Spec.Template 有变化更新 ReplicaSetd.Spec 的其它字段变化实际就更新状态。

创建 Deployment 时,其实是在更新逻辑的 rolloutRollingrolloutRecreate 方法中创建的新的 ReplicaSet

扩缩容和暂停

当前 Deployment 的状态如果是 .Spec.Paused 状态或 scaling 状态(dc.isScalingEvent 函数判断),就执行 dc.sync 方法同步 Deployment 状态。

dc.sync 方法负责协调 Deploymentscaling 状态事件或 paused 操作:

pkg/controller/deployment/sync.go:49
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (dc *DeploymentController) sync(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
// 获取新 ReplicaSet 和历史所有的 ReplicaSet
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
if err != nil {
return err
}
// 尝试执行扩缩容
if err := dc.scale(ctx, d, newRS, oldRSs); err != nil {
// If we get an error while trying to scale, the deployment will be requeued
// so we can abort this resync
return err
}

// Clean up the deployment when it's paused and no rollback is in flight.
// 当前处于 Pause 状态但没在进行回滚时,进行清理 Deployment
if d.Spec.Paused && getRollbackTo(d) == nil {
if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
return err
}
}

allRSs := append(oldRSs, newRS)
// 同步 Deploymen 状态
return dc.syncDeploymentStatus(ctx, allRSs, newRS, d)
}

cleanupDeployment 执行的清理是根据配置的历史版本数保存上限,清理超出限制的历史版本。

scaling 状态

scaling 状态的判断是通过 ReplicaSetdeployment.kubernetes.io/desired-replicas 注解进行的。

pkg/controller/deployment/sync.go:532
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (dc *DeploymentController) isScalingEvent(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) (bool, error) {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
if err != nil {
return false, err
}
allRSs := append(oldRSs, newRS)
logger := klog.FromContext(ctx)
for _, rs := range controller.FilterActiveReplicaSets(allRSs) {
desired, ok := deploymentutil.GetDesiredReplicasAnnotation(logger, rs)
if !ok {
continue
}
// 注解的值和 Deployment 的期望值不同
if desired != *(d.Spec.Replicas) {
return true, nil
}
}
return false, nil
}

如果下面两个值不一样就是 scaling 状态:

  1. Deployment 期望的副本数 .Spec.Replicas
  2. 任一活跃 ReplicaSet 的注解 deployment.kubernetes.io/desired-replicas

获取所有 ReplicaSet

上面通过调用 dc.getAllReplicaSetsAndSyncRevision 方法获取 Deployment 的所有旧的 ReplicaSet 和当前最新的 ReplicaSet

pkg/controller/deployment/sync.go:116
1
2
3
4
5
6
7
func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(
ctx context.Context,
d *apps.Deployment,
rsList []*apps.ReplicaSet,
createIfNotExisted bool) (*apps.ReplicaSet, []*apps.ReplicaSet, error) {
//...
}

确定一个 ReplicaSet 是新 ReplicaSet 的方式是判断 DeploymentReplicaSet.Spec.Template 是否 Hash 相等。

当传入方法的第四个参数为 true 且找不到符合条件的新 ReplicaSet 时,会创建一个新的 ReplicaSet。

ReplicaSet 会使用 Deployment 对象去配置 .Spec.ReplicasDesiredReplicasAnnotation 注解:

  • .Spec.ReplicasDeployment.Spec.Replicas + MaxSurge - 当前旧 Replicas 数量,一般等于 MaxSurge 值;
    • 滚动更新时,总 Pod 数量会比期望数量多一些,更新完成后会恢复为期望值;
    • 替换更新时,直接等于 Deployment 期望值;
    • 该值不会高于 Deployment 期望值;
  • DesiredReplicasAnnotation:`Deployment.Spec.Replicas

所以,这里返回的 newRS 就是我们 Deployment 期望的最终状态。

活跃 ReplicaSet

所谓活跃的就是 .Spec.Replicas 大于 0 的。这里有三种情况:

  • 单纯在调整 DeploymentReplicas:只有一个活跃ReplicaSet
  • 从 0 副本扩容:旧 ReplicaSet 全都不活跃,使用传入的新 ReplicaSet
  • 滚动升级:同时存在多个活跃的 ReplicaSet,不满足条件,走下一个分支
    • 旧的 ReplicaSet:正在缩容
    • 新的 ReplicaSet:创建时设置了初始 .Spec.Replicas,所以能被当成活跃的 ReplicaSet

99% 的时间里 Deployment 对应的活跃的 ReplicaSet 只有一个,只有更新时才会出现 2 个 ReplicaSet ,极少数情况下(短时间重复更新)才会出现 2 个以上的 ReplicaSet

scale 方法

接下来重点看一下 scale 方法:

pkg/controller/deployment/sync.go:298
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
func (dc *DeploymentController) scale(ctx context.Context, deployment *apps.Deployment, newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet) error {

// 获取活跃或最新的 ReplicaSet,有多个活跃 RS 时返回 nil;
// 如果只有一个活跃的 ReplicaSet 就把这个 RS 副本数扩容到 Deployment 配置的;
// 如果无活跃的 RS,则扩容最新的 RS
if activeOrLatest := deploymentutil.FindActiveOrLatest(newRS, oldRSs); activeOrLatest != nil {
// 如果 RS 副本数已经和 Deployment 配置的相同就退出
if *(activeOrLatest.Spec.Replicas) == *(deployment.Spec.Replicas) {
return nil
}
// 执行 scale 操作
_, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, activeOrLatest, *(deployment.Spec.Replicas), deployment)
return err
}

// 如果新的 RS 已经饱和则旧 RS 应该全部缩容到 0
if deploymentutil.IsSaturated(deployment, newRS) {
for _, old := range controller.FilterActiveReplicaSets(oldRSs) {
if _, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, old, 0, deployment); err != nil {
return err
}
}
return nil
}

// 在滚动更新时,同时存在老的 RS 和新的 RS,需要适当控制 RS 的扩缩容以保证不超过 MaxSurge
if deploymentutil.IsRollingUpdate(deployment) {
allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS))
allRSsReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)

allowedSize := int32(0)
if *(deployment.Spec.Replicas) > 0 {
allowedSize = *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(*deployment)
}

// 需要增加的 Pod 数量,负数则为减少。调整的数量需要合适的分布到活跃的 RS 中
deploymentReplicasToAdd := allowedSize - allRSsReplicas

// 缩放方向决定了在我们试图缩放相同大小 RS 的情况下会发生什么。
// 当进行扩容时,应操作更新的 RS;缩容时先操作更老的 RS
var scalingOperation string
switch {
case deploymentReplicasToAdd > 0:
sort.Sort(controller.ReplicaSetsBySizeNewer(allRSs))
scalingOperation = "up"

case deploymentReplicasToAdd < 0:
sort.Sort(controller.ReplicaSetsBySizeOlder(allRSs))
scalingOperation = "down"
}

// 遍历所有活跃 RS,计算每个 RS 的副本数量
deploymentReplicasAdded := int32(0)
nameToSize := make(map[string]int32)
for i := range allRSs {
rs := allRSs[i]

// 如果有需要调整的 Pod 就进行调整,否则保持
if deploymentReplicasToAdd != 0 {
proportion := deploymentutil.GetProportion(rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded)

nameToSize[rs.Name] = *(rs.Spec.Replicas) + proportion
deploymentReplicasAdded += proportion
} else {
nameToSize[rs.Name] = *(rs.Spec.Replicas)
}
}

// 更新 RS
for i := range allRSs {
rs := allRSs[i]

// Add/remove any leftovers to the largest replica set.
// 第一个 RS
if i == 0 && deploymentReplicasToAdd != 0 {
// 把多余的变化应用到第一个 RS 上
leftover := deploymentReplicasToAdd - deploymentReplicasAdded
nameToSize[rs.Name] = nameToSize[rs.Name] + leftover
if nameToSize[rs.Name] < 0 {
nameToSize[rs.Name] = 0
}
}

// 更新 RS
if _, _, err := dc.scaleReplicaSet(ctx, rs, nameToSize[rs.Name], deployment, scalingOperation); err != nil {
// Return as soon as we fail, the deployment is requeued
return err
}
}
}
return nil
}

该方法主要是对 ReplicaSet 进行扩缩容操作,这个方法只有 scaling 事件和 pausedDeployment 中使用,正常的滚动更新不会走这里处理

  • 首先获取活跃的 ReplicaSet
    • 如果有一个活跃 ReplicaSet(没有活跃就是最后一个 ReplicaSet)则直接对该 ReplicaSet 进行 scale 操作(替换更新和常规的的扩缩容在这里处理);
    • 多个活跃 ReplicaSet 就进入下一面代码;
  • 接下来就判断是否新 ReplicaSet 已经调整完毕:
    • 判断新的 ReplicaSet 是否已经饱和,如饱和将旧 RS 缩容到 0;
  • 没调整完说明同时存在老的 ReplicaSet 和新的 ReplicaSet,需要适当控制 RS 的扩缩容以保证不超过 MaxSurge
    • 只有策略是滚动升级才可能运行到这里,替换更新一般只进入第一个步骤就结束了;
    • 此时滚动更新正在进行,紧接着进行扩缩容操作

新旧 ReplicaSet 扩缩容

当同时存在新旧的 ReplicaSet 时,在 scale 方法的最后对滚动升级的 ReplicaSet 进行调整。

这个新旧 ReplicaSet 扩缩容的动作,只是在滚动更新的同时又进行扩缩容操作时进行,主要的逻辑将新增或减少的副本数先平均分摊到所有活跃的 ReplicaSet,再将剩余的应用到原来副本数最多的 ReplicaSet 上(当副本数相同就比较创建时间,扩容选新,缩容选旧)。

详细逻辑如下:

  • 首先通过预期 Replicas 数量和当前活跃 ReplicaSet 的副本总数,计算出要变动的 Pod 数量 deploymentReplicasToAdd
    • 如果是负值:缩容,活跃 ReplicaSet 列表排序,数量多、旧的在前;
    • 如果是正值:扩容,活跃 ReplicaSet 列表排序,数量多、新的在前。
  • 遍历所有活跃 ReplicaSet,计算每个 ReplicaSet 的副本数,将 deploymentReplicasToAdd 分摊到各个活跃的 ReplicaSet 上;
  • 再次遍历所有活跃 ReplicaSet,更新对应 ReplicaSet 的副本数并将多余的 deploymentReplicasToAdd 应用到第一个 ReplicaSet
    • 如果是扩容,应用到数量最多或最新的;
    • 如果是缩容,应用到数量最多或最旧的。

注意

这里执行完后 scaling 事件就已经结束(deployment.kubernetes.io/desired-replicas 注解已更新成 Deployment 的 .Spec.Replicas 的值)。

后面继续走滚动更新的逻辑(rolloutRolling)完成新旧 ReplicaSet 的滚动。

回滚

回滚操作的原理是:

  • 复制历史的某个版本的 ReplicaSet 里的 podTemplate.Spec
  • 替换当前 Deployment.Spec.Template,删除 rollback 注解;
  • 下一次 Deployment 完成更新操作。

更新

这里的更新操作完成了 ReplicaSet 注解和 .Spec.Replicas 字段的操作,实际的扩缩容操作是由 ReplicaSetController 来进行的。

同时,只有 Deployment.Spec.Template 有变化才创建新的 ReplicaSet 并进行更新,而 Deployment.Spec 的其它字段变化只会更新 Deployment 和原有 ReplicaSet 的状态。

滚动更新

滚动更新 RollingUpdate 是默认的策略。DeploymentSpec.Template 字段的内容只要一更新就会生成新的 ReplicaSet,并且基于新的 ReplicaSet 执行滚动更新,原有的 ReplicaSet 会进行滚动缩容。

pkg/controller/deployment/rolling.go:32
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (dc *DeploymentController) rolloutRolling(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
// 获取 RS,如果不存在新 RS 就创建
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true)
if err != nil {
return err
}
allRSs := append(oldRSs, newRS)

// 尝试进行扩容操作
scaledUp, err := dc.reconcileNewReplicaSet(ctx, allRSs, newRS, d)
if err != nil {
return err
}
if scaledUp {
// Update DeploymentStatus
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}

// 尝试进行缩容操作
scaledDown, err := dc.reconcileOldReplicaSets(ctx, allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
if err != nil {
return err
}
if scaledDown {
// Update DeploymentStatus
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}

if deploymentutil.DeploymentComplete(d, &d.Status) {
if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
return err
}
}

// Sync deployment status
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}

滚动更新过程:

  • getAllReplicaSetsAndSyncRevision:获取所有 ReplicaSet,如果新 ReplicaSet 不存在就创建一个新的(参看上面:获取所有 ReplicaSet);
  • reconcileNewReplicaSet 对新 ReplicaSet 进行扩容;
  • reconcileOldReplicaSets 对旧 ReplicaSet 进行缩容;
    • 对于新增的 ReplicaSet,此时总副本数已经超过期望数,需要在这里对旧 ReplicaSet 进行缩容操作
  • 更新 Deployment 状态

这里的扩缩容操作实际上是通过修改 .Spec.Replicas 和使用 deploymentutil.SetReplicasAnnotations 函数操作 ReplicaSet 注解 deployment.kubernetes.io/desired-replicas 实现的。

deployment.kubernetes.io/desired-replicas 注解会设置为 Deployment.Spec.Replicas 的值。滚动的的过程是修改 ReplicaSet.Spec.Replicas

整个滚动更新过程就是不断地扩容新 ReplicaSet、缩容旧 ReplicaSet 再更新 Deployment 过程:

  1. 扩容新 ReplicaSet.Spec.Replicas:原值 + maxSurge,直到等于 Deployment.Spec.Replicas
  2. 缩容所有旧 ReplicaSet.Spec.Replicas:总数缩减最多 maxUnavalible,直到等于 0;
  3. 更新 DeploymentStatus,触发下一轮 reconcile。

多轮滚动后,新 ReplicaSet 副本数达到预期值,旧 ReplicaSet 副本数也缩减到 0,滚动更新结束。

替换更新

替换更新相比之下更为简单,和滚动更新不同的是替换更新先对原有的 ReplicaSet 进行缩容操作,直到所有的 Pod 都退出后再创建新的 ReplicaSet 并进行扩容。

pkg/controller/deployment/recreate.go:29
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func (dc *DeploymentController) rolloutRecreate(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map[types.UID][]*v1.Pod) error {
// 如果不存在新 RS,在缩容前不创建
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
if err != nil {
return err
}
allRSs := append(oldRSs, newRS)
activeOldRSs := controller.FilterActiveReplicaSets(oldRSs)

// 对 RS 进行缩容
scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(ctx, activeOldRSs, d)
if err != nil {
return err
}
if scaledDown {
// Update DeploymentStatus.
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}

// 缩容没完成,还有 Pod 在运行,先等着(先跳过)
if oldPodsRunning(newRS, oldRSs, podMap) {
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}

// 如果不存在新 RS,就创建
if newRS == nil {
newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true)
if err != nil {
return err
}
allRSs = append(oldRSs, newRS)
}

// 扩容新 RS
if _, err := dc.scaleUpNewReplicaSetForRecreate(ctx, newRS, d); err != nil {
return err
}

if util.DeploymentComplete(d, &d.Status) {
if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
return err
}
}

// Sync deployment status.
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}

不更新情况

前面说过,Deployment.Spec 的其它字段变化只会更新 Deployment 和原有 ReplicaSet 的状态。这里回顾一下 rolloutRollingrolloutRecreate 的代码,看看是怎么实现的。

不更新情况:指 Deployment.Spec.Template 没有发生变化,Deployment.Spec 有变化。

滚动更新

pkg/controller/deployment/rolling.go:32
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (dc *DeploymentController) rolloutRolling(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
// 获取 RS,当不更新时,newRS 取原先的 RS
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true)
if err != nil {
return err
}
allRSs := append(oldRSs, newRS)

// 如果容量不变,不扩容
scaledUp, err := dc.reconcileNewReplicaSet(ctx, allRSs, newRS, d)
if err != nil {
return err
}
if scaledUp {
// Update DeploymentStatus
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}

// 缩容操作。如果活跃的旧 RS 为空,不缩容
scaledDown, err := dc.reconcileOldReplicaSets(ctx, allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
if err != nil {
return err
}
if scaledDown {
// Update DeploymentStatus
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}

// 更新状态
if deploymentutil.DeploymentComplete(d, &d.Status) {
if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
return err
}
}

// Sync deployment status
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}

替换更新

pkg/controller/deployment/recreate.go:29
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func (dc *DeploymentController) rolloutRecreate(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map[types.UID][]*v1.Pod) error {
// 当不更新时,newRS 取原先的 RS
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
if err != nil {
return err
}
allRSs := append(oldRSs, newRS)
// oldRSs 中没有活跃 RS,这里为空
activeOldRSs := controller.FilterActiveReplicaSets(oldRSs)

// 对 RS 进行缩容,activeOldRSs 为空,实际不缩容
scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(ctx, activeOldRSs, d)
if err != nil {
return err
}
if scaledDown {
// Update DeploymentStatus.
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}

// 在进行的 Pod 属于当前 newRS 的,跳过
if oldPodsRunning(newRS, oldRSs, podMap) {
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}

// newRS 不会为 nil
if newRS == nil {
newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true)
if err != nil {
return err
}
allRSs = append(oldRSs, newRS)
}

// 副本数没变化,实际没扩容
if _, err := dc.scaleUpNewReplicaSetForRecreate(ctx, newRS, d); err != nil {
return err
}

// 更新状态
if util.DeploymentComplete(d, &d.Status) {
if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
return err
}
}

// Sync deployment status.
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}

引申

实现 Deployment 重启

平时使用 Deployment 部署开发环境时,想要重启应用很多时候都是直接把 Pod 删除来达到重启的目的。有没有一种更优雅的方法呢?

从上面对 DeploymentController 对于更新的实现可以知道,当 Deployment.Spec.Template 发生变化时会触发更新流程。

我们可以在 .Spec.Template 里加个条注解(Annotations)记录,值设定为当前的时间,这样就能触发更新流程,实现重启功能。

这样的做法其实是 kubectl rollout restart 命令的实现原理,实际上 POD Template 并没有改变,只是通过在 .spec.template.metadata.annotations 注解里增加或修改 kubectl.kubernetes.io/restartedAt 的时间戳来实现重启,并不会修改副本数。

vendor/k8s.io/kubectl/pkg/polymorphichelpers/objectrestarter.go:32
1
2
3
4
5
6
7
8
9
10
11
12
13
func defaultObjectRestarter(obj runtime.Object) ([]byte, error) {  
switch obj := obj.(type) {
case *extensionsv1beta1.Deployment:
if obj.Spec.Paused {
return nil, errors.New("can't restart paused deployment (run rollout resume first)")
}
if obj.Spec.Template.ObjectMeta.Annotations == nil {
obj.Spec.Template.ObjectMeta.Annotations = make(map[string]string)
}
obj.Spec.Template.ObjectMeta.Annotations["kubectl.kubernetes.io/restartedAt"] = time.Now().Format(time.RFC3339)
return runtime.Encode(scheme.Codecs.LegacyCodec(extensionsv1beta1.SchemeGroupVersion), obj)
// ...略
}

总结

DeploymentPod 管理是通过 ReplicaSet 来进行的,DeploymentController 的代码也不涉及 Pod 的直接操作。

DeploymentControllerDeploymentReplicaSet 的操作并不是立即完成的,而是在控制循环中反复执行、收敛、修正,最终达到期望状态,完成更新。

引用

  1. P3-Controller 分类与 Deployment Controller
  2. 当你创建了一个 Deployment 时,Kubernetes 内部发生了什么?
作者

Jakes Lee

发布于

2024-08-20

更新于

2024-08-20

许可协议

评论