Deployment
是 Kubernetes 三个常用工作负载中最常用的。Deployment
用于管理应用的部署情况,可以实现应用的滚动升级和回滚,还能实现应用的扩缩容。
Deployment
通过 ReplicaSet
来管理 Pod
。一个完整的 Deployment
创建到 Pod
被拉起的流程由多个控制器协同完成:
当用户创建 Deployment
时通过 kubectl 等客户端调用 API Server:
API Server 对请求进行认证,最终创建一个 Deloyment
对象,此时会产生 Deplyment
创建事件 ;
DeploymentController
监听到事件后,创建 ReplicaSet
对象(由 dc.syncDeployment
方法实现),产生 ReplicaSet
创建事件;
ReplicaSetController
监听到 ReplicaSet
创建事件,创建 Pod
对象(由 syncReplicaSet
方法实现),产生 Pod
创建事件;
此时 Pod
的 Spec.nodeName
为空;
scheduler
监听到 Pod
创建事件并对 Spec.nodeName
为空的 Pod
执行调度逻辑,选定节点后更新 Pod
的 Spec.nodeName
,产生 Pod 更新事件(由 schedule
的 sched.scheduleOne
方法实现);
kubelet
监听到 Pod
更新事件,判断 Pod
的 Spec.nodeName
是否是当前节点,匹配后按 Pod
的定义启动容器,同时更新 Pod
的 Status
(由 Kubelet
的 syncPod
实现)。
Controller 实例的构造 前文《Kubernetes 集群的大脑 Controller Manager 》里介绍过 kube-controller-manager 启动内置控制器的方法。而在 NewControllerInitializers
函数可以看到本文的主角 DeploymentController
的启动方法:
cmd/kube-controller-manager/app/controllermanager.go:445 1 2 3 4 5 func NewControllerInitializers (loopMode ControllerLoopMode) map [string ]InitFunc { register("deployment" , startDeploymentController) }
进入里面看看:
cmd/kube-controller-manager/app/apps.go:72 1 2 3 4 5 6 7 8 9 10 11 12 13 func startDeploymentController (ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool , error ) { dc, err := deployment.NewDeploymentController( controllerContext.InformerFactory.Apps().V1().Deployments(), controllerContext.InformerFactory.Apps().V1().ReplicaSets(), controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.ClientBuilder.ClientOrDie("deployment-controller" ), ) if err != nil { return nil , true , fmt.Errorf("error creating Deployment controller: %v" , err) } go dc.Run(ctx, int (controllerContext.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs)) return nil , true , nil }
Deployment
是通过 ReplicaSet
来管理 Pod
的,所以这里会获取这三种资源的 Informer。
然后在协程中启动 DeploymentController.Run
方法,开启 Deployment
资源管理的控制循环。在启动方法里使用了协程,所以在 StartControllers
遍历时没有使用。
pkg/controller/deployment/deployment_controller.go:144 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func (dc *DeploymentController) Run(ctx context.Context, workers int ) { defer utilruntime.HandleCrash() dc.eventBroadcaster.StartStructuredLogging(0 ) dc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.client.CoreV1().Events("" )}) defer dc.eventBroadcaster.Shutdown() defer dc.queue.ShutDown() klog.InfoS("Starting controller" , "controller" , "deployment" ) defer klog.InfoS("Shutting down controller" , "controller" , "deployment" ) if !cache.WaitForNamedCacheSync("deployment" , ctx.Done(), dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) { return } for i := 0 ; i < workers; i++ { go wait.UntilWithContext(ctx, dc.worker, time.Second) } <-ctx.Done() }
workers
的默认值是 5,所以启动 5 个协程来运行 worker
。
pkg/controller/deployment/deployment_controller.go:461 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (dc *DeploymentController) worker(ctx context.Context) { for dc.processNextWorkItem(ctx) { } } func (dc *DeploymentController) processNextWorkItem(ctx context.Context) bool { key, quit := dc.queue.Get() if quit { return false } defer dc.queue.Done(key) err := dc.syncHandler(ctx, key.(string )) dc.handleErr(err, key) return true }
worker
函数直接无限循环执行 processNextWorkItem
函数。Kubernetes 很多 Controller 都使用这样的格式声明处理 Controller 数据的方法:
1 2 3 func (dc *XXController) processNextWorkItem(ctx context.Context) bool {}
processNextWorkItem
函数有以下特点:
一次只从 queue
中取出一个 key
来处理;
同一个 key
不会并发调用 syncHandler
;
处理完后将 key
标记为完成。
queue 组件 queue
中的数据由 Reflector
回调的函数创建。在 NewDeploymentController
中创建 Controller 时注册了事件处理函数:
注册事件处理函数 1 2 3 4 5 6 7 8 9 10 11 func NewDeploymentController (dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error ) { dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dc.addDeployment, UpdateFunc: dc.updateDeployment, DeleteFunc: dc.deleteDeployment, }) }
当接收到 Deployment
的创建事件后,由 enqueue
方法实现加入队列 queue
:
入队逻辑 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (dc *DeploymentController) addDeployment(obj interface {}) { d := obj.(*apps.Deployment) klog.V(4 ).InfoS("Adding deployment" , "deployment" , klog.KObj(d)) dc.enqueueDeployment(d) } dc.enqueueDeployment = dc.enqueue func (dc *DeploymentController) enqueue(deployment *apps.Deployment) { key, err := controller.KeyFunc(deployment) if err != nil { utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v" , deployment, err)) return } dc.queue.Add(key) }
无论是创建、变更还是删除事件 ,最终都是通过 enqueue
方法将事件加入队列中,然后在主流程中取出处理。
主流程 queue
中存储的事件都是调用的 syncHandler
函数进行处理。syncHandler
是个函数指针,指向了 dc.syncDeployment
函数:
pkg/controller/deployment/deployment_controller.go:569 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 func (dc *DeploymentController) syncDeployment(ctx context.Context, key string ) error { deployment, err := dc.dLister.Deployments(namespace).Get(name) if errors.IsNotFound(err) { klog.V(2 ).InfoS("Deployment has been deleted" , "deployment" , klog.KRef(namespace, name)) return nil } if err != nil { return err } d := deployment.DeepCopy() everything := metav1.LabelSelector{} if reflect.DeepEqual(d.Spec.Selector, &everything) { dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll" , "This deployment is selecting all pods. A non-empty selector is required." ) if d.Status.ObservedGeneration < d.Generation { d.Status.ObservedGeneration = d.Generation dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) } return nil } rsList, err := dc.getReplicaSetsForDeployment(ctx, d) if err != nil { return err } podMap, err := dc.getPodMapForDeployment(d, rsList) if err != nil { return err } if d.DeletionTimestamp != nil { return dc.syncStatusOnly(ctx, d, rsList) } if err = dc.checkPausedConditions(ctx, d); err != nil { return err } if d.Spec.Paused { return dc.sync(ctx, d, rsList) } if getRollbackTo(d) != nil { return dc.rollback(ctx, d, rsList) } scalingEvent, err := dc.isScalingEvent(ctx, d, rsList) if err != nil { return err } if scalingEvent { return dc.sync(ctx, d, rsList) } switch d.Spec.Strategy.Type { case apps.RecreateDeploymentStrategyType: return dc.rolloutRecreate(ctx, d, rsList, podMap) case apps.RollingUpdateDeploymentStrategyType: return dc.rolloutRolling(ctx, d, rsList) } return fmt.Errorf("unexpected deployment strategy type: %s" , d.Spec.Strategy.Type) }
syncDeployment
主要执行了以下逻辑:
获取事件的 Deployment
;
如果 Deployment
的 selector
为空,则发布告警事件并返回;
获取 Deployment
对应的 ReplicaSet
;
获取 Deployment
的 Pod
,使用 ReplicaSet
作为 Key
分组;
如果当前是在暂停或恢复 Deployment
,使用 Unknown
状态更新 Deployment
状态;
如果在回滚状态,进行回滚;
如果是 scaling
事件 ,执行调整;
判断当前部署策略
滚动更新(RollingUpdateDeploymentStrategyType
默认):执行 rolloutRolling
重建(RecreateDeploymentStrategyType
):执行 rolloutRecreate
主流程主要做四件事:
扩缩容:调用 isScalingEvent()
函数遍历活跃 ReplicaSet
,判断是否 desired-replicas
注解与 d.Spec.Replicas
存在差异,有差异表示 Deployment
期望副本数有变化;
暂停处理;
回滚;
更新:d.Spec.Template
有变化更新 ReplicaSet
,d.Spec
的其它字段变化实际就更新状态。
创建 Deployment
时,其实是在更新逻辑的 rolloutRolling
或 rolloutRecreate
方法中创建的新的 ReplicaSet
。
扩缩容和暂停 当前 Deployment
的状态如果是 .Spec.Paused
状态或 scaling
状态(dc.isScalingEvent
函数判断),就执行 dc.sync
方法同步 Deployment
状态。
dc.sync
方法负责协调 Deployment
的 scaling
状态事件或 paused
操作:
pkg/controller/deployment/sync.go:49 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func (dc *DeploymentController) sync(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error { newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false ) if err != nil { return err } if err := dc.scale(ctx, d, newRS, oldRSs); err != nil { return err } if d.Spec.Paused && getRollbackTo(d) == nil { if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil { return err } } allRSs := append (oldRSs, newRS) return dc.syncDeploymentStatus(ctx, allRSs, newRS, d) }
cleanupDeployment
执行的清理是根据配置的历史版本数保存上限,清理超出限制的历史版本。
scaling
状态scaling
状态的判断是通过 ReplicaSet
的 deployment.kubernetes.io/desired-replicas
注解进行的。
pkg/controller/deployment/sync.go:532 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (dc *DeploymentController) isScalingEvent(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) (bool , error ) { newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false ) if err != nil { return false , err } allRSs := append (oldRSs, newRS) logger := klog.FromContext(ctx) for _, rs := range controller.FilterActiveReplicaSets(allRSs) { desired, ok := deploymentutil.GetDesiredReplicasAnnotation(logger, rs) if !ok { continue } if desired != *(d.Spec.Replicas) { return true , nil } } return false , nil }
如果下面两个值不一样就是 scaling
状态:
Deployment
期望的副本数 .Spec.Replicas
;
任一活跃 ReplicaSet
的注解 deployment.kubernetes.io/desired-replicas
。
获取所有 ReplicaSet
上面通过调用 dc.getAllReplicaSetsAndSyncRevision
方法获取 Deployment 的所有旧的 ReplicaSet
和当前最新的 ReplicaSet
。
pkg/controller/deployment/sync.go:116 1 2 3 4 5 6 7 func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision( ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet, createIfNotExisted bool ) (*apps.ReplicaSet, []*apps.ReplicaSet, error ) { }
确定一个 ReplicaSet
是新 ReplicaSet
的方式是判断 Deployment
和 ReplicaSet
的 .Spec.Template
是否 Hash 相等。
当传入方法的第四个参数为 true
且找不到符合条件的新 ReplicaSet 时,会创建一个新的 ReplicaSet。
新 ReplicaSet
会使用 Deployment
对象去配置 .Spec.Replicas
和 DesiredReplicasAnnotation
注解:
.Spec.Replicas
:Deployment.Spec.Replicas + MaxSurge - 当前旧 Replicas 数量
,一般等于 MaxSurge
值;
滚动更新时,总 Pod 数量会比期望数量多一些,更新完成后会恢复为期望值;
替换更新时,直接等于 Deployment
期望值;
该值不会高于 Deployment
期望值;
DesiredReplicasAnnotation
:`Deployment.Spec.Replicas
所以,这里返回的 newRS
就是我们 Deployment
期望的最终状态。
活跃 ReplicaSet
所谓活跃的就是 .Spec.Replicas
大于 0 的。这里有三种情况:
单纯在调整 Deployment
的 Replicas
:只有一个活跃 的 ReplicaSet
;
从 0 副本扩容:旧 ReplicaSet
全都不活跃,使用传入的新 ReplicaSet
;
滚动升级:同时存在多个活跃的 ReplicaSet
,不满足条件,走下一个分支
旧的 ReplicaSet
:正在缩容
新的 ReplicaSet
:创建时设置了初始 .Spec.Replicas
,所以能被当成活跃的 ReplicaSet
99% 的时间里 Deployment
对应的活跃的 ReplicaSet
只有一个,只有更新时才会出现 2 个 ReplicaSet
,极少数情况下(短时间重复更新)才会出现 2 个以上的 ReplicaSet
。
scale 方法 接下来重点看一下 scale
方法:
pkg/controller/deployment/sync.go:298 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 func (dc *DeploymentController) scale(ctx context.Context, deployment *apps.Deployment, newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet) error { if activeOrLatest := deploymentutil.FindActiveOrLatest(newRS, oldRSs); activeOrLatest != nil { if *(activeOrLatest.Spec.Replicas) == *(deployment.Spec.Replicas) { return nil } _, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, activeOrLatest, *(deployment.Spec.Replicas), deployment) return err } if deploymentutil.IsSaturated(deployment, newRS) { for _, old := range controller.FilterActiveReplicaSets(oldRSs) { if _, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, old, 0 , deployment); err != nil { return err } } return nil } if deploymentutil.IsRollingUpdate(deployment) { allRSs := controller.FilterActiveReplicaSets(append (oldRSs, newRS)) allRSsReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs) allowedSize := int32 (0 ) if *(deployment.Spec.Replicas) > 0 { allowedSize = *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(*deployment) } deploymentReplicasToAdd := allowedSize - allRSsReplicas var scalingOperation string switch { case deploymentReplicasToAdd > 0 : sort.Sort(controller.ReplicaSetsBySizeNewer(allRSs)) scalingOperation = "up" case deploymentReplicasToAdd < 0 : sort.Sort(controller.ReplicaSetsBySizeOlder(allRSs)) scalingOperation = "down" } deploymentReplicasAdded := int32 (0 ) nameToSize := make (map [string ]int32 ) for i := range allRSs { rs := allRSs[i] if deploymentReplicasToAdd != 0 { proportion := deploymentutil.GetProportion(rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded) nameToSize[rs.Name] = *(rs.Spec.Replicas) + proportion deploymentReplicasAdded += proportion } else { nameToSize[rs.Name] = *(rs.Spec.Replicas) } } for i := range allRSs { rs := allRSs[i] if i == 0 && deploymentReplicasToAdd != 0 { leftover := deploymentReplicasToAdd - deploymentReplicasAdded nameToSize[rs.Name] = nameToSize[rs.Name] + leftover if nameToSize[rs.Name] < 0 { nameToSize[rs.Name] = 0 } } if _, _, err := dc.scaleReplicaSet(ctx, rs, nameToSize[rs.Name], deployment, scalingOperation); err != nil { return err } } } return nil }
该方法主要是对 ReplicaSet
进行扩缩容操作,这个方法只有 scaling
事件和 paused
的 Deployment
中使用,正常的滚动更新不会走这里处理 :
首先获取活跃的 ReplicaSet
:
如果有一个活跃 ReplicaSet
(没有活跃就是最后一个 ReplicaSet
)则直接对该 ReplicaSet
进行 scale 操作(替换更新 和常规的的扩缩容在这里处理);
多个活跃 ReplicaSet
就进入下一面代码;
接下来就判断是否新 ReplicaSet
已经调整完毕:
判断新的 ReplicaSet
是否已经饱和,如饱和将旧 RS 缩容到 0;
没调整完说明同时存在老的 ReplicaSet
和新的 ReplicaSet
,需要适当控制 RS 的扩缩容以保证不超过 MaxSurge
。
只有策略是滚动升级才可能运行到这里,替换更新一般只进入第一个步骤就结束了;
此时滚动更新正在进行,紧接着进行扩缩容操作 。
新旧 ReplicaSet
扩缩容 当同时存在新旧的 ReplicaSet
时,在 scale
方法的最后对滚动升级的 ReplicaSet
进行调整。
这个新旧 ReplicaSet
扩缩容的动作,只是在滚动更新的同时又进行扩缩容操作 时进行,主要的逻辑将新增或减少的副本数先平均分摊 到所有活跃的 ReplicaSet
,再将剩余的 应用到原来副本数最多的 ReplicaSet
上(当副本数相同就比较创建时间,扩容选新,缩容选旧)。
详细逻辑如下:
首先通过预期 Replicas
数量和当前活跃 ReplicaSet
的副本总数,计算出要变动的 Pod
数量 deploymentReplicasToAdd
:
如果是负值:缩容,活跃 ReplicaSet
列表排序,数量多、旧的在前;
如果是正值:扩容,活跃 ReplicaSet
列表排序,数量多、新的在前。
遍历所有活跃 ReplicaSet
,计算每个 ReplicaSet
的副本数,将 deploymentReplicasToAdd
分摊到各个活跃的 ReplicaSet
上;
再次遍历所有活跃 ReplicaSet
,更新对应 ReplicaSet
的副本数并将多余的 deploymentReplicasToAdd
应用到第一个 ReplicaSet
:
如果是扩容,应用到数量最多或最新 的;
如果是缩容,应用到数量最多或最旧 的。
这里执行完后 scaling
事件就已经结束(deployment.kubernetes.io/desired-replicas
注解已更新成 Deployment 的 .Spec.Replicas
的值)。
后面继续走滚动更新的逻辑(rolloutRolling
)完成新旧 ReplicaSet
的滚动。
回滚 回滚操作的原理是:
复制历史的某个版本的 ReplicaSet
里的 podTemplate.Spec
;
替换当前 Deployment
的 .Spec.Template
,删除 rollback
注解;
下一次 Deployment
完成更新操作。
更新 这里的更新操作完成了 ReplicaSet
注解和 .Spec.Replicas
字段的操作,实际的扩缩容操作是由 ReplicaSetController
来进行的。
同时,只有 Deployment.Spec.Template
有变化才创建新的 ReplicaSet
并进行更新,而 Deployment.Spec
的其它字段变化只会更新 Deployment
和原有 ReplicaSet
的状态。
滚动更新 滚动更新 RollingUpdate
是默认的策略。Deployment
的 Spec.Template
字段的内容只要一更新就会生成新的 ReplicaSet
,并且基于新的 ReplicaSet
执行滚动更新,原有的 ReplicaSet
会进行滚动缩容。
pkg/controller/deployment/rolling.go:32 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 func (dc *DeploymentController) rolloutRolling(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error { newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true ) if err != nil { return err } allRSs := append (oldRSs, newRS) scaledUp, err := dc.reconcileNewReplicaSet(ctx, allRSs, newRS, d) if err != nil { return err } if scaledUp { return dc.syncRolloutStatus(ctx, allRSs, newRS, d) } scaledDown, err := dc.reconcileOldReplicaSets(ctx, allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d) if err != nil { return err } if scaledDown { return dc.syncRolloutStatus(ctx, allRSs, newRS, d) } if deploymentutil.DeploymentComplete(d, &d.Status) { if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil { return err } } return dc.syncRolloutStatus(ctx, allRSs, newRS, d) }
滚动更新过程:
getAllReplicaSetsAndSyncRevision
:获取所有 ReplicaSet
,如果新 ReplicaSet
不存在就创建一个新的(参看上面:获取所有 ReplicaSet );
reconcileNewReplicaSet
对新 ReplicaSet
进行扩容;
reconcileOldReplicaSets
对旧 ReplicaSet
进行缩容;
对于新增的 ReplicaSet
,此时总副本数已经超过期望数,需要在这里对旧 ReplicaSet
进行缩容操作
更新 Deployment
状态
这里的扩缩容操作实际上是通过修改 .Spec.Replicas
和使用 deploymentutil.SetReplicasAnnotations
函数操作 ReplicaSet
注解 deployment.kubernetes.io/desired-replicas
实现的。
deployment.kubernetes.io/desired-replicas
注解会设置为 Deployment
的 .Spec.Replicas
的值。滚动的的过程是修改 ReplicaSet
的 .Spec.Replicas
。
整个滚动更新过程就是不断地扩容新 ReplicaSet
、缩容旧 ReplicaSet
再更新 Deployment
过程:
扩容新 ReplicaSet
的 .Spec.Replicas
:原值 + maxSurge
,直到等于 Deployment.Spec.Replicas
;
缩容所有旧 ReplicaSet
的 .Spec.Replicas
:总数缩减最多 maxUnavalible
,直到等于 0;
更新 Deployment
的 Status
,触发下一轮 reconcile。
多轮滚动后,新 ReplicaSet
副本数达到预期值,旧 ReplicaSet
副本数也缩减到 0,滚动更新结束。
替换更新 替换更新相比之下更为简单,和滚动更新不同的是替换更新先对原有的 ReplicaSet
进行缩容操作,直到所有的 Pod
都退出后再创建新的 ReplicaSet
并进行扩容。
pkg/controller/deployment/recreate.go:29 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 func (dc *DeploymentController) rolloutRecreate(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map [types.UID][]*v1.Pod) error { newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false ) if err != nil { return err } allRSs := append (oldRSs, newRS) activeOldRSs := controller.FilterActiveReplicaSets(oldRSs) scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(ctx, activeOldRSs, d) if err != nil { return err } if scaledDown { return dc.syncRolloutStatus(ctx, allRSs, newRS, d) } if oldPodsRunning(newRS, oldRSs, podMap) { return dc.syncRolloutStatus(ctx, allRSs, newRS, d) } if newRS == nil { newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true ) if err != nil { return err } allRSs = append (oldRSs, newRS) } if _, err := dc.scaleUpNewReplicaSetForRecreate(ctx, newRS, d); err != nil { return err } if util.DeploymentComplete(d, &d.Status) { if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil { return err } } return dc.syncRolloutStatus(ctx, allRSs, newRS, d) }
不更新情况 前面说过,Deployment.Spec
的其它字段变化只会更新 Deployment
和原有 ReplicaSet
的状态。这里回顾一下 rolloutRolling
和 rolloutRecreate
的代码,看看是怎么实现的。
不更新情况 :指 Deployment.Spec.Template
没有发生变化,Deployment.Spec
有变化。
滚动更新 pkg/controller/deployment/rolling.go:32 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 func (dc *DeploymentController) rolloutRolling(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error { newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true ) if err != nil { return err } allRSs := append (oldRSs, newRS) scaledUp, err := dc.reconcileNewReplicaSet(ctx, allRSs, newRS, d) if err != nil { return err } if scaledUp { return dc.syncRolloutStatus(ctx, allRSs, newRS, d) } scaledDown, err := dc.reconcileOldReplicaSets(ctx, allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d) if err != nil { return err } if scaledDown { return dc.syncRolloutStatus(ctx, allRSs, newRS, d) } if deploymentutil.DeploymentComplete(d, &d.Status) { if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil { return err } } return dc.syncRolloutStatus(ctx, allRSs, newRS, d) }
替换更新 pkg/controller/deployment/recreate.go:29 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 func (dc *DeploymentController) rolloutRecreate(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map [types.UID][]*v1.Pod) error { newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false ) if err != nil { return err } allRSs := append (oldRSs, newRS) activeOldRSs := controller.FilterActiveReplicaSets(oldRSs) scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(ctx, activeOldRSs, d) if err != nil { return err } if scaledDown { return dc.syncRolloutStatus(ctx, allRSs, newRS, d) } if oldPodsRunning(newRS, oldRSs, podMap) { return dc.syncRolloutStatus(ctx, allRSs, newRS, d) } if newRS == nil { newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true ) if err != nil { return err } allRSs = append (oldRSs, newRS) } if _, err := dc.scaleUpNewReplicaSetForRecreate(ctx, newRS, d); err != nil { return err } if util.DeploymentComplete(d, &d.Status) { if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil { return err } } return dc.syncRolloutStatus(ctx, allRSs, newRS, d) }
引申 实现 Deployment 重启 平时使用 Deployment
部署开发环境时,想要重启应用很多时候都是直接把 Pod
删除来达到重启的目的。有没有一种更优雅的方法呢?
从上面对 DeploymentController
对于更新的实现可以知道,当 Deployment
的 .Spec.Template
发生变化时会触发更新流程。
我们可以在 .Spec.Template
里加个条注解(Annotations
)记录,值设定为当前的时间,这样就能触发更新流程,实现重启功能。
这样的做法其实是 kubectl rollout restart
命令的实现原理,实际上 POD Template 并没有改变,只是通过在 .spec.template.metadata.annotations
注解里增加或修改 kubectl.kubernetes.io/restartedAt
的时间戳来实现重启,并不会修改副本数。
vendor/k8s.io/kubectl/pkg/polymorphichelpers/objectrestarter.go:32 1 2 3 4 5 6 7 8 9 10 11 12 13 func defaultObjectRestarter (obj runtime.Object) ([]byte , error ) { switch obj := obj.(type ) { case *extensionsv1beta1.Deployment: if obj.Spec.Paused { return nil , errors.New("can't restart paused deployment (run rollout resume first)" ) } if obj.Spec.Template.ObjectMeta.Annotations == nil { obj.Spec.Template.ObjectMeta.Annotations = make (map [string ]string ) } obj.Spec.Template.ObjectMeta.Annotations["kubectl.kubernetes.io/restartedAt" ] = time.Now().Format(time.RFC3339) return runtime.Encode(scheme.Codecs.LegacyCodec(extensionsv1beta1.SchemeGroupVersion), obj) }
总结 Deployment
的 Pod
管理是通过 ReplicaSet
来进行的,DeploymentController
的代码也不涉及 Pod
的直接操作。
DeploymentController
对 Deployment
和 ReplicaSet
的操作并不是立即完成的,而是在控制循环中反复执行、收敛、修正,最终达到期望状态,完成更新。
引用
P3-Controller 分类与 Deployment Controller
当你创建了一个 Deployment 时,Kubernetes 内部发生了什么?