Deployment 是 Kubernetes 三个常用工作负载中最常用的。Deployment 用于管理应用的部署情况,可以实现应用的滚动升级和回滚,还能实现应用的扩缩容。
Deployment 通过 ReplicaSet 来管理 Pod。一个完整的 Deployment 创建到 Pod 被拉起的流程由多个控制器协同完成:
当用户创建 Deployment 时通过 kubectl 等客户端调用 API Server:
API Server 对请求进行认证,最终创建一个 Deloyment 对象,此时会产生 Deplyment 创建事件 ;
DeploymentController 监听到事件后,创建 ReplicaSet 对象(由 dc.syncDeployment 方法实现),产生 ReplicaSet 创建事件;
ReplicaSetController 监听到 ReplicaSet 创建事件,创建 Pod 对象(由 syncReplicaSet 方法实现),产生 Pod 创建事件;
此时 Pod 的 Spec.nodeName 为空;
scheduler 监听到 Pod 创建事件并对 Spec.nodeName 为空的 Pod 执行调度逻辑,选定节点后更新 Pod 的 Spec.nodeName,产生 Pod 更新事件(由 schedule 的 sched.scheduleOne 方法实现);
kubelet 监听到 Pod 更新事件,判断 Pod 的 Spec.nodeName 是否是当前节点,匹配后按 Pod 的定义启动容器,同时更新 Pod 的 Status(由 Kubelet 的 syncPod 实现)。
Controller 实例的构造 前文《Kubernetes 集群的大脑 Controller Manager 》里介绍过 kube-controller-manager 启动内置控制器的方法。而在 NewControllerInitializers 函数可以看到本文的主角 DeploymentController 的启动方法:
cmd/kube-controller-manager/app/controllermanager.go:445 1 2 3 4 5 func NewControllerInitializers (loopMode ControllerLoopMode) map [string ]InitFunc { register("deployment" , startDeploymentController) }
进入里面看看:
cmd/kube-controller-manager/app/apps.go:72 1 2 3 4 5 6 7 8 9 10 11 12 13 func startDeploymentController (ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool , error ) { dc, err := deployment.NewDeploymentController( controllerContext.InformerFactory.Apps().V1().Deployments(), controllerContext.InformerFactory.Apps().V1().ReplicaSets(), controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.ClientBuilder.ClientOrDie("deployment-controller" ), ) if err != nil { return nil , true , fmt.Errorf("error creating Deployment controller: %v" , err) } go dc.Run(ctx, int (controllerContext.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs)) return nil , true , nil }
Deployment 是通过 ReplicaSet 来管理 Pod 的,所以这里会获取这三种资源的 Informer。
然后在协程中启动 DeploymentController.Run 方法,开启 Deployment 资源管理的控制循环。在启动方法里使用了协程,所以在 StartControllers 遍历时没有使用。
pkg/controller/deployment/deployment_controller.go:144 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func (dc *DeploymentController) Run(ctx context.Context, workers int ) { defer utilruntime.HandleCrash() dc.eventBroadcaster.StartStructuredLogging(0 ) dc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.client.CoreV1().Events("" )}) defer dc.eventBroadcaster.Shutdown() defer dc.queue.ShutDown() klog.InfoS("Starting controller" , "controller" , "deployment" ) defer klog.InfoS("Shutting down controller" , "controller" , "deployment" ) if !cache.WaitForNamedCacheSync("deployment" , ctx.Done(), dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) { return } for i := 0 ; i < workers; i++ { go wait.UntilWithContext(ctx, dc.worker, time.Second) } <-ctx.Done() }
workers 的默认值是 5,所以启动 5 个协程来运行 worker。
pkg/controller/deployment/deployment_controller.go:461 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (dc *DeploymentController) worker(ctx context.Context) { for dc.processNextWorkItem(ctx) { } } func (dc *DeploymentController) processNextWorkItem(ctx context.Context) bool { key, quit := dc.queue.Get() if quit { return false } defer dc.queue.Done(key) err := dc.syncHandler(ctx, key.(string )) dc.handleErr(err, key) return true }
worker 函数直接无限循环执行 processNextWorkItem 函数。Kubernetes 很多 Controller 都使用这样的格式声明处理 Controller 数据的方法:
1 2 3 func (dc *XXController) processNextWorkItem(ctx context.Context) bool {}
processNextWorkItem 函数有以下特点:
一次只从 queue 中取出一个 key 来处理;
同一个 key 不会并发调用 syncHandler;
处理完后将 key 标记为完成。
queue 组件 queue 中的数据由 Reflector 回调的函数创建。在 NewDeploymentController 中创建 Controller 时注册了事件处理函数:
注册事件处理函数 1 2 3 4 5 6 7 8 9 10 11 func NewDeploymentController (dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error ) { dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dc.addDeployment, UpdateFunc: dc.updateDeployment, DeleteFunc: dc.deleteDeployment, }) }
当接收到 Deployment 的创建事件后,由 enqueue 方法实现加入队列 queue:
入队逻辑 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (dc *DeploymentController) addDeployment(obj interface {}) { d := obj.(*apps.Deployment) klog.V(4 ).InfoS("Adding deployment" , "deployment" , klog.KObj(d)) dc.enqueueDeployment(d) } dc.enqueueDeployment = dc.enqueue func (dc *DeploymentController) enqueue(deployment *apps.Deployment) { key, err := controller.KeyFunc(deployment) if err != nil { utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v" , deployment, err)) return } dc.queue.Add(key) }
无论是创建、变更还是删除事件 ,最终都是通过 enqueue 方法将事件加入队列中,然后在主流程中取出处理。
主流程 queue 中存储的事件都是调用的 syncHandler 函数进行处理。syncHandler 是个函数指针,指向了 dc.syncDeployment 函数:
pkg/controller/deployment/deployment_controller.go:569 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 func (dc *DeploymentController) syncDeployment(ctx context.Context, key string ) error { deployment, err := dc.dLister.Deployments(namespace).Get(name) if errors.IsNotFound(err) { klog.V(2 ).InfoS("Deployment has been deleted" , "deployment" , klog.KRef(namespace, name)) return nil } if err != nil { return err } d := deployment.DeepCopy() everything := metav1.LabelSelector{} if reflect.DeepEqual(d.Spec.Selector, &everything) { dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll" , "This deployment is selecting all pods. A non-empty selector is required." ) if d.Status.ObservedGeneration < d.Generation { d.Status.ObservedGeneration = d.Generation dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) } return nil } rsList, err := dc.getReplicaSetsForDeployment(ctx, d) if err != nil { return err } podMap, err := dc.getPodMapForDeployment(d, rsList) if err != nil { return err } if d.DeletionTimestamp != nil { return dc.syncStatusOnly(ctx, d, rsList) } if err = dc.checkPausedConditions(ctx, d); err != nil { return err } if d.Spec.Paused { return dc.sync(ctx, d, rsList) } if getRollbackTo(d) != nil { return dc.rollback(ctx, d, rsList) } scalingEvent, err := dc.isScalingEvent(ctx, d, rsList) if err != nil { return err } if scalingEvent { return dc.sync(ctx, d, rsList) } switch d.Spec.Strategy.Type { case apps.RecreateDeploymentStrategyType: return dc.rolloutRecreate(ctx, d, rsList, podMap) case apps.RollingUpdateDeploymentStrategyType: return dc.rolloutRolling(ctx, d, rsList) } return fmt.Errorf("unexpected deployment strategy type: %s" , d.Spec.Strategy.Type) }
syncDeployment 主要执行了以下逻辑:
获取事件的 Deployment;
如果 Deployment 的 selector 为空,则发布告警事件并返回;
获取 Deployment 对应的 ReplicaSet;
获取 Deployment 的 Pod,使用 ReplicaSet 作为 Key 分组;
如果当前是在暂停或恢复 Deployment,使用 Unknown 状态更新 Deployment 状态;
如果在回滚状态,进行回滚;
如果是 scaling 事件 ,执行调整;
判断当前部署策略
滚动更新(RollingUpdateDeploymentStrategyType 默认):执行 rolloutRolling
重建(RecreateDeploymentStrategyType):执行 rolloutRecreate
主流程主要做四件事:
扩缩容:调用 isScalingEvent() 函数遍历活跃 ReplicaSet,判断是否 desired-replicas 注解与 d.Spec.Replicas 存在差异,有差异表示 Deployment 期望副本数有变化;
暂停处理;
回滚;
更新:d.Spec.Template 有变化更新 ReplicaSet,d.Spec 的其它字段变化实际就更新状态。
创建 Deployment 时,其实是在更新逻辑的 rolloutRolling 或 rolloutRecreate 方法中创建的新的 ReplicaSet。
扩缩容和暂停 当前 Deployment 的状态如果是 .Spec.Paused 状态或 scaling 状态(dc.isScalingEvent 函数判断),就执行 dc.sync 方法同步 Deployment 状态。
dc.sync 方法负责协调 Deployment 的 scaling 状态事件或 paused 操作:
pkg/controller/deployment/sync.go:49 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func (dc *DeploymentController) sync(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error { newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false ) if err != nil { return err } if err := dc.scale(ctx, d, newRS, oldRSs); err != nil { return err } if d.Spec.Paused && getRollbackTo(d) == nil { if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil { return err } } allRSs := append (oldRSs, newRS) return dc.syncDeploymentStatus(ctx, allRSs, newRS, d) }
cleanupDeployment 执行的清理是根据配置的历史版本数保存上限,清理超出限制的历史版本。
scaling 状态scaling 状态的判断是通过 ReplicaSet 的 deployment.kubernetes.io/desired-replicas 注解进行的。
pkg/controller/deployment/sync.go:532 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (dc *DeploymentController) isScalingEvent(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) (bool , error ) { newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false ) if err != nil { return false , err } allRSs := append (oldRSs, newRS) logger := klog.FromContext(ctx) for _, rs := range controller.FilterActiveReplicaSets(allRSs) { desired, ok := deploymentutil.GetDesiredReplicasAnnotation(logger, rs) if !ok { continue } if desired != *(d.Spec.Replicas) { return true , nil } } return false , nil }
如果下面两个值不一样就是 scaling 状态:
Deployment 期望的副本数 .Spec.Replicas;
任一活跃 ReplicaSet 的注解 deployment.kubernetes.io/desired-replicas。
获取所有 ReplicaSet 上面通过调用 dc.getAllReplicaSetsAndSyncRevision 方法获取 Deployment 的所有旧的 ReplicaSet 和当前最新的 ReplicaSet。
pkg/controller/deployment/sync.go:116 1 2 3 4 5 6 7 func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision( ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet, createIfNotExisted bool ) (*apps.ReplicaSet, []*apps.ReplicaSet, error ) { }
确定一个 ReplicaSet 是新 ReplicaSet 的方式是判断 Deployment 和 ReplicaSet 的 .Spec.Template 是否 Hash 相等。
当传入方法的第四个参数为 true 且找不到符合条件的新 ReplicaSet 时,会创建一个新的 ReplicaSet。
新 ReplicaSet 会使用 Deployment 对象去配置 .Spec.Replicas 和 DesiredReplicasAnnotation 注解:
.Spec.Replicas:Deployment.Spec.Replicas + MaxSurge - 当前旧 Replicas 数量,一般等于 MaxSurge 值;
滚动更新时,总 Pod 数量会比期望数量多一些,更新完成后会恢复为期望值;
替换更新时,直接等于 Deployment 期望值;
该值不会高于 Deployment 期望值;
DesiredReplicasAnnotation:`Deployment.Spec.Replicas
所以,这里返回的 newRS 就是我们 Deployment 期望的最终状态。
活跃 ReplicaSet 所谓活跃的就是 .Spec.Replicas 大于 0 的。这里有三种情况:
单纯在调整 Deployment 的 Replicas:只有一个活跃 的 ReplicaSet;
从 0 副本扩容:旧 ReplicaSet 全都不活跃,使用传入的新 ReplicaSet;
滚动升级:同时存在多个活跃的 ReplicaSet,不满足条件,走下一个分支
旧的 ReplicaSet:正在缩容
新的 ReplicaSet:创建时设置了初始 .Spec.Replicas,所以能被当成活跃的 ReplicaSet
99% 的时间里 Deployment 对应的活跃的 ReplicaSet 只有一个,只有更新时才会出现 2 个 ReplicaSet ,极少数情况下(短时间重复更新)才会出现 2 个以上的 ReplicaSet。
scale 方法 接下来重点看一下 scale 方法:
pkg/controller/deployment/sync.go:298 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 func (dc *DeploymentController) scale(ctx context.Context, deployment *apps.Deployment, newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet) error { if activeOrLatest := deploymentutil.FindActiveOrLatest(newRS, oldRSs); activeOrLatest != nil { if *(activeOrLatest.Spec.Replicas) == *(deployment.Spec.Replicas) { return nil } _, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, activeOrLatest, *(deployment.Spec.Replicas), deployment) return err } if deploymentutil.IsSaturated(deployment, newRS) { for _, old := range controller.FilterActiveReplicaSets(oldRSs) { if _, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, old, 0 , deployment); err != nil { return err } } return nil } if deploymentutil.IsRollingUpdate(deployment) { allRSs := controller.FilterActiveReplicaSets(append (oldRSs, newRS)) allRSsReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs) allowedSize := int32 (0 ) if *(deployment.Spec.Replicas) > 0 { allowedSize = *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(*deployment) } deploymentReplicasToAdd := allowedSize - allRSsReplicas var scalingOperation string switch { case deploymentReplicasToAdd > 0 : sort.Sort(controller.ReplicaSetsBySizeNewer(allRSs)) scalingOperation = "up" case deploymentReplicasToAdd < 0 : sort.Sort(controller.ReplicaSetsBySizeOlder(allRSs)) scalingOperation = "down" } deploymentReplicasAdded := int32 (0 ) nameToSize := make (map [string ]int32 ) for i := range allRSs { rs := allRSs[i] if deploymentReplicasToAdd != 0 { proportion := deploymentutil.GetProportion(rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded) nameToSize[rs.Name] = *(rs.Spec.Replicas) + proportion deploymentReplicasAdded += proportion } else { nameToSize[rs.Name] = *(rs.Spec.Replicas) } } for i := range allRSs { rs := allRSs[i] if i == 0 && deploymentReplicasToAdd != 0 { leftover := deploymentReplicasToAdd - deploymentReplicasAdded nameToSize[rs.Name] = nameToSize[rs.Name] + leftover if nameToSize[rs.Name] < 0 { nameToSize[rs.Name] = 0 } } if _, _, err := dc.scaleReplicaSet(ctx, rs, nameToSize[rs.Name], deployment, scalingOperation); err != nil { return err } } } return nil }
该方法主要是对 ReplicaSet 进行扩缩容操作,这个方法只有 scaling 事件和 paused 的 Deployment 中使用,正常的滚动更新不会走这里处理 :
首先获取活跃的 ReplicaSet:
如果有一个活跃 ReplicaSet(没有活跃就是最后一个 ReplicaSet)则直接对该 ReplicaSet 进行 scale 操作(替换更新 和常规的的扩缩容在这里处理);
多个活跃 ReplicaSet 就进入下一面代码;
接下来就判断是否新 ReplicaSet 已经调整完毕:
判断新的 ReplicaSet 是否已经饱和,如饱和将旧 RS 缩容到 0;
没调整完说明同时存在老的 ReplicaSet 和新的 ReplicaSet,需要适当控制 RS 的扩缩容以保证不超过 MaxSurge。
只有策略是滚动升级才可能运行到这里,替换更新一般只进入第一个步骤就结束了;
此时滚动更新正在进行,紧接着进行扩缩容操作 。
新旧 ReplicaSet 扩缩容 当同时存在新旧的 ReplicaSet 时,在 scale 方法的最后对滚动升级的 ReplicaSet 进行调整。
这个新旧 ReplicaSet 扩缩容的动作,只是在滚动更新的同时又进行扩缩容操作 时进行,主要的逻辑将新增或减少的副本数先平均分摊 到所有活跃的 ReplicaSet,再将剩余的 应用到原来副本数最多的 ReplicaSet 上(当副本数相同就比较创建时间,扩容选新,缩容选旧)。
详细逻辑如下:
首先通过预期 Replicas 数量和当前活跃 ReplicaSet 的副本总数,计算出要变动的 Pod 数量 deploymentReplicasToAdd:
如果是负值:缩容,活跃 ReplicaSet 列表排序,数量多、旧的在前;
如果是正值:扩容,活跃 ReplicaSet 列表排序,数量多、新的在前。
遍历所有活跃 ReplicaSet,计算每个 ReplicaSet 的副本数,将 deploymentReplicasToAdd 分摊到各个活跃的 ReplicaSet 上;
再次遍历所有活跃 ReplicaSet,更新对应 ReplicaSet 的副本数并将多余的 deploymentReplicasToAdd 应用到第一个 ReplicaSet:
如果是扩容,应用到数量最多或最新 的;
如果是缩容,应用到数量最多或最旧 的。
这里执行完后 scaling 事件就已经结束(deployment.kubernetes.io/desired-replicas 注解已更新成 Deployment 的 .Spec.Replicas 的值)。
后面继续走滚动更新的逻辑(rolloutRolling)完成新旧 ReplicaSet 的滚动。
回滚 回滚操作的原理是:
复制历史的某个版本的 ReplicaSet 里的 podTemplate.Spec;
替换当前 Deployment 的 .Spec.Template,删除 rollback 注解;
下一次 Deployment 完成更新操作。
更新 这里的更新操作完成了 ReplicaSet 注解和 .Spec.Replicas 字段的操作,实际的扩缩容操作是由 ReplicaSetController 来进行的。
同时,只有 Deployment.Spec.Template 有变化才创建新的 ReplicaSet 并进行更新,而 Deployment.Spec 的其它字段变化只会更新 Deployment 和原有 ReplicaSet 的状态。
滚动更新 滚动更新 RollingUpdate 是默认的策略。Deployment 的 Spec.Template 字段的内容只要一更新就会生成新的 ReplicaSet,并且基于新的 ReplicaSet 执行滚动更新,原有的 ReplicaSet 会进行滚动缩容。
pkg/controller/deployment/rolling.go:32 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 func (dc *DeploymentController) rolloutRolling(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error { newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true ) if err != nil { return err } allRSs := append (oldRSs, newRS) scaledUp, err := dc.reconcileNewReplicaSet(ctx, allRSs, newRS, d) if err != nil { return err } if scaledUp { return dc.syncRolloutStatus(ctx, allRSs, newRS, d) } scaledDown, err := dc.reconcileOldReplicaSets(ctx, allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d) if err != nil { return err } if scaledDown { return dc.syncRolloutStatus(ctx, allRSs, newRS, d) } if deploymentutil.DeploymentComplete(d, &d.Status) { if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil { return err } } return dc.syncRolloutStatus(ctx, allRSs, newRS, d) }
滚动更新过程:
getAllReplicaSetsAndSyncRevision:获取所有 ReplicaSet,如果新 ReplicaSet 不存在就创建一个新的(参看上面:获取所有 ReplicaSet );
reconcileNewReplicaSet 对新 ReplicaSet 进行扩容;
reconcileOldReplicaSets 对旧 ReplicaSet 进行缩容;
对于新增的 ReplicaSet,此时总副本数已经超过期望数,需要在这里对旧 ReplicaSet 进行缩容操作
更新 Deployment 状态
这里的扩缩容操作实际上是通过修改 .Spec.Replicas 和使用 deploymentutil.SetReplicasAnnotations 函数操作 ReplicaSet 注解 deployment.kubernetes.io/desired-replicas 实现的。
deployment.kubernetes.io/desired-replicas 注解会设置为 Deployment 的 .Spec.Replicas 的值。滚动的的过程是修改 ReplicaSet 的 .Spec.Replicas。
整个滚动更新过程就是不断地扩容新 ReplicaSet、缩容旧 ReplicaSet 再更新 Deployment 过程:
扩容新 ReplicaSet 的 .Spec.Replicas:原值 + maxSurge,直到等于 Deployment.Spec.Replicas;
缩容所有旧 ReplicaSet 的 .Spec.Replicas:总数缩减最多 maxUnavalible,直到等于 0;
更新 Deployment 的 Status,触发下一轮 reconcile。
多轮滚动后,新 ReplicaSet 副本数达到预期值,旧 ReplicaSet 副本数也缩减到 0,滚动更新结束。
替换更新 替换更新相比之下更为简单,和滚动更新不同的是替换更新先对原有的 ReplicaSet 进行缩容操作,直到所有的 Pod 都退出后再创建新的 ReplicaSet 并进行扩容。
pkg/controller/deployment/recreate.go:29 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 func (dc *DeploymentController) rolloutRecreate(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map [types.UID][]*v1.Pod) error { newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false ) if err != nil { return err } allRSs := append (oldRSs, newRS) activeOldRSs := controller.FilterActiveReplicaSets(oldRSs) scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(ctx, activeOldRSs, d) if err != nil { return err } if scaledDown { return dc.syncRolloutStatus(ctx, allRSs, newRS, d) } if oldPodsRunning(newRS, oldRSs, podMap) { return dc.syncRolloutStatus(ctx, allRSs, newRS, d) } if newRS == nil { newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true ) if err != nil { return err } allRSs = append (oldRSs, newRS) } if _, err := dc.scaleUpNewReplicaSetForRecreate(ctx, newRS, d); err != nil { return err } if util.DeploymentComplete(d, &d.Status) { if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil { return err } } return dc.syncRolloutStatus(ctx, allRSs, newRS, d) }
不更新情况 前面说过,Deployment.Spec 的其它字段变化只会更新 Deployment 和原有 ReplicaSet 的状态。这里回顾一下 rolloutRolling 和 rolloutRecreate 的代码,看看是怎么实现的。
不更新情况 :指 Deployment.Spec.Template 没有发生变化,Deployment.Spec 有变化。
滚动更新 pkg/controller/deployment/rolling.go:32 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 func (dc *DeploymentController) rolloutRolling(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error { newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true ) if err != nil { return err } allRSs := append (oldRSs, newRS) scaledUp, err := dc.reconcileNewReplicaSet(ctx, allRSs, newRS, d) if err != nil { return err } if scaledUp { return dc.syncRolloutStatus(ctx, allRSs, newRS, d) } scaledDown, err := dc.reconcileOldReplicaSets(ctx, allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d) if err != nil { return err } if scaledDown { return dc.syncRolloutStatus(ctx, allRSs, newRS, d) } if deploymentutil.DeploymentComplete(d, &d.Status) { if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil { return err } } return dc.syncRolloutStatus(ctx, allRSs, newRS, d) }
替换更新 pkg/controller/deployment/recreate.go:29 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 func (dc *DeploymentController) rolloutRecreate(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map [types.UID][]*v1.Pod) error { newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false ) if err != nil { return err } allRSs := append (oldRSs, newRS) activeOldRSs := controller.FilterActiveReplicaSets(oldRSs) scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(ctx, activeOldRSs, d) if err != nil { return err } if scaledDown { return dc.syncRolloutStatus(ctx, allRSs, newRS, d) } if oldPodsRunning(newRS, oldRSs, podMap) { return dc.syncRolloutStatus(ctx, allRSs, newRS, d) } if newRS == nil { newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true ) if err != nil { return err } allRSs = append (oldRSs, newRS) } if _, err := dc.scaleUpNewReplicaSetForRecreate(ctx, newRS, d); err != nil { return err } if util.DeploymentComplete(d, &d.Status) { if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil { return err } } return dc.syncRolloutStatus(ctx, allRSs, newRS, d) }
引申 实现 Deployment 重启 平时使用 Deployment 部署开发环境时,想要重启应用很多时候都是直接把 Pod 删除来达到重启的目的。有没有一种更优雅的方法呢?
从上面对 DeploymentController 对于更新的实现可以知道,当 Deployment 的 .Spec.Template 发生变化时会触发更新流程。
我们可以在 .Spec.Template 里加个条注解(Annotations)记录,值设定为当前的时间,这样就能触发更新流程,实现重启功能。
这样的做法其实是 kubectl rollout restart 命令的实现原理,实际上 POD Template 并没有改变,只是通过在 .spec.template.metadata.annotations 注解里增加或修改 kubectl.kubernetes.io/restartedAt 的时间戳来实现重启,并不会修改副本数。
vendor/k8s.io/kubectl/pkg/polymorphichelpers/objectrestarter.go:32 1 2 3 4 5 6 7 8 9 10 11 12 13 func defaultObjectRestarter (obj runtime.Object) ([]byte , error ) { switch obj := obj.(type ) { case *extensionsv1beta1.Deployment: if obj.Spec.Paused { return nil , errors.New("can't restart paused deployment (run rollout resume first)" ) } if obj.Spec.Template.ObjectMeta.Annotations == nil { obj.Spec.Template.ObjectMeta.Annotations = make (map [string ]string ) } obj.Spec.Template.ObjectMeta.Annotations["kubectl.kubernetes.io/restartedAt" ] = time.Now().Format(time.RFC3339) return runtime.Encode(scheme.Codecs.LegacyCodec(extensionsv1beta1.SchemeGroupVersion), obj) }
总结 Deployment 的 Pod 管理是通过 ReplicaSet 来进行的,DeploymentController 的代码也不涉及 Pod 的直接操作。
DeploymentController 对 Deployment 和 ReplicaSet 的操作并不是立即完成的,而是在控制循环中反复执行、收敛、修正,最终达到期望状态,完成更新。
引用
P3-Controller 分类与 Deployment Controller
当你创建了一个 Deployment 时,Kubernetes 内部发生了什么?