Kubernetes 内部很多核心组件是有状态的,都以一主多从多实例的方式运行。这些组件的一主多从中,只有主实例负责处理数据,从实例处在热备状态。当主实例异常时从实例将竞选成为主实例并接替进行任务处理,所以这个选举机制是 Kubernetes 对于这有状态组件高可用的保障。
核心组件如 kube-scheduler 或 kube-controller-manager 等组件,在同一时刻只有一个实例在处理业务逻辑,因此需要在启动的实例中进行选主,决定哪个实例负责处理任务。这些核心组件都是使用的 client-go 中提供的工具类 leaderelection
,也就是本文的主角。
leaderelection
依赖于 Kubernetes 中提供的 Endpoints
、ConfigMap
和 Lease
三种资源锁,leaderelection
选主的实现方式就是基于这三种资源锁:
- 多个副本去创建资源,创建成功则获得锁成为 leader;
- leader 在租约内去刷新锁;
- 其他副本则通过比对锁的更新时间,判断是否竞争成为 leader。
除了能在核心组件中使用,这个组件也能使用在我们开发的应用中,前提是我们的应用运行在 Kubernetes 环境且有操作资源锁的权限。
启动选举
如果我们想使用 leaderelection
组件实现选主的功能,那么在开始选举先需要先通过方法 resourcelock.New
获取资源锁的对象。Kubernetes 虽然现在只有 endpoints/configmap/lease 几种资源锁,但他们之间可以组合使用。
可选的资源锁组合如下:
资源锁组合1 2 3 4 5 6 7
| const ( endpointsResourceLock = "endpoints" configMapsResourceLock = "configmaps" LeasesResourceLock = "leases" EndpointsLeasesResourceLock = "endpointsleases" ConfigMapsLeasesResourceLock = "configmapsleases" )
|
另外,resourcelock.NewFromKubeconfig
方法也可以创建资源锁,该方法对 resourcelock.New
进行了封装。kube-controller-manager 就是使用这个方法创建的资源锁:
kube-controller-manager 创建资源锁1 2 3 4 5 6 7 8 9
| rl, err := resourcelock.NewFromKubeconfig(resourceLock, c.ComponentConfig.Generic.LeaderElection.ResourceNamespace, leaseName, resourcelock.ResourceLockConfig{ Identity: lockIdentity, EventRecorder: c.EventRecorder, }, c.Kubeconfig, c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration)
|
在 kube-controller-manager 中,resourceLock
参数默认传的是 endpointsleases
,所以会创建一个包含 endpointsLock
和 LeaseLock
的组合锁 MultiLock
,在操作锁的时候会先操作 endpointsLock
,成功再操作 LeaseLock
。
准备好资源锁后,调用方法 leaderelection.RunOrDie
开始选举。
启动选举1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ Lock: lock, LeaseDuration: 60 * time.Second, RenewDeadline: 15 * time.Second, RetryPeriod: 5 * time.Second, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { run(ctx) }, OnStoppedLeading: func() { klog.Infof("leader lost: %s", id) os.Exit(0) }, OnNewLeader: func(identity string) { if identity == id { return } klog.Infof("new leader elected: %s", identity) }, }, })
|
选举主流程
启动选举后,RunOrDie
方法会调用 le.Run(ctx)
方法开始真正的选举流程,该方法除非在以下情况下才会返回:
ctx
被取消(外部要求中止选举流程)
- 当选了 Leader 后,任期结束(网络或某种原因导致续期失败)
其它情况,比如当前节点未曾当选 Leader 则会卡在 acquire
方法中持续竞选。
le.Run(ctx)1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| func (le *LeaderElector) Run(ctx context.Context) { defer runtime.HandleCrash() defer func() { le.config.Callbacks.OnStoppedLeading() }()
if !le.acquire(ctx) { return } ctx, cancel := context.WithCancel(ctx) defer cancel() go le.config.Callbacks.OnStartedLeading(ctx) le.renew(ctx) }
|
竞选
acquire1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| func (le *LeaderElector) acquire(ctx context.Context) bool { ctx, cancel := context.WithCancel(ctx) defer cancel() succeeded := false desc := le.config.Lock.Describe() klog.Infof("attempting to acquire leader lease %v...", desc) wait.JitterUntil(func() { succeeded = le.tryAcquireOrRenew(ctx) le.maybeReportTransition() if !succeeded { klog.V(4).Infof("failed to acquire lease %v", desc) return } le.config.Lock.RecordEvent("became leader") le.metrics.leaderOn(le.config.Name) klog.Infof("successfully acquired lease %v", desc) cancel() }, le.config.RetryPeriod, JitterFactor, true, ctx.Done()) return succeeded }
|
对于返回值,有以下情况:
succeeded
默认值是 false
,在竞选循环中,外部通过 ctx
中止竞选时会中止 wait.JitterUntil
循环并返回 false
;
- 如果竞选成功,则会改写
succeeded=true
并手动调用 cancel()
中止 wait.JitterUntil
循环。
所以 acquire
函数只会有两种情况会返回:
true
:当选 leader;
false
:外部中止竞选。
未当选的竞选者会在 wait.JitterUntil
循环中持续尝试。
抢锁和续期
抢锁操作和 Leader 续期都是在 tryAcquireOrRenew
方法中实现。
- 如果当前节点未取得锁,会尝试获取锁;
- 否则进行续期;
- 成功返回
true
,失败返回 false
。
tryAcquireOrRenew1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool { now := metav1.Now() leaderElectionRecord := rl.LeaderElectionRecord{ HolderIdentity: le.config.Lock.Identity(), LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second), RenewTime: now, AcquireTime: now, }
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx) if err != nil { if !errors.IsNotFound(err) { klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err) return false } if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil { klog.Errorf("error initially creating leader election record: %v", err) return false }
le.setObservedRecord(&leaderElectionRecord)
return true }
if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) { le.setObservedRecord(oldLeaderElectionRecord)
le.observedRawRecord = oldLeaderElectionRawRecord } if len(oldLeaderElectionRecord.HolderIdentity) > 0 && le.observedTime.Add(le.config.LeaseDuration).After(now.Time) && !le.IsLeader() { klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity) return false }
if le.IsLeader() { leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions } else { leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1 }
if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil { klog.Errorf("Failed to update lock: %v", err) return false }
le.setObservedRecord(&leaderElectionRecord) return true }
|
代码的流程如下:
任期的原理
代码中对是否在任期的判断是基于 le.observedTime
的,le.observedTime
是在 leader 发生变化时调用 le.setObservedRecord
方法更新的。更新时机:
- 锁不存在,创建锁成功时更新;
- 获取到锁记录和缓存的不同,说明上次尝试获取锁到现在的间隔内 leader 变化了,更新缓存;
- leader 超期没续期且当前节点抢锁成功,更新缓存。
相当于每次 le.observedTime
变化的时候都是监测到 leader 变化的时间,所以 le.observedTime + LeaseDuration
的时间就是 leader 当前任期的结束时间。
如果当前时间超过这个任期时间,但 leader 没及时刷新锁,就会导致获取到的锁记录 oldLeaderElectionRawRecord
和缓存的相同(无法满足更新时机 2),那么当前节点会走到后面抢锁的逻辑,执行锁更新的尝试。
抢锁的原理
进行锁更新尝试(抢锁)的原理是基于 kubernetes 的资源乐观锁实现的:
- 获取锁方法
le.config.Lock.Get(ctx)
会取得当前锁的最新 resourceVersion
并保存;
- 更新锁时提供保存的
resourceVersion
;
- Kubernetes 对比
resourceVersion
和最新值,如果相等则允许更新,返回成功,否则更新失败。
当 leader 在任期内时,通常只有 leader 自己去更新锁;而在抢锁阶段会有多个节点尝试更新,但只有第一个到达 Kubernetes 的更新请求会被处理,其它节点请求到达的时候 resourceVersion
已经被更新过了,所以请求会被拒绝。
续期处理循环
在获取到锁成为 leader 后,会进入 le.renew(ctx)
方法进行定期续期操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| func (le *LeaderElector) renew(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) defer cancel() wait.Until(func() { timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline) defer timeoutCancel() err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) { return le.tryAcquireOrRenew(timeoutCtx), nil }, timeoutCtx.Done())
le.maybeReportTransition() desc := le.config.Lock.Describe() if err == nil { klog.V(5).Infof("successfully renewed lease %v", desc) return } le.config.Lock.RecordEvent("stopped leading") le.metrics.leaderOff(le.config.Name) klog.Infof("failed to renew lease %v: %v", desc, err) cancel() }, le.config.RetryPeriod, ctx.Done())
if le.config.ReleaseOnCancel { le.release() } }
|
renew
方法如果续期失败会直接返回,然后选主流程结束。其它主从竞选流程通常都会让当前节点重新成为备选节点并继续进行选主,而 leaderelection
是直接中止。
要想使用 leaderelection
实现重新竞选,需要自己再次调用 leaderelection.RunOrDie
重新开始或重启程序。
总结
leaderelection
流程中,只有 Leader 才是 Worker 节点,其它节点是热备节点,用于在 Leader 异常时及时接替工作。
kube-controller-manager
等 Kubernetes 内部组件在 leader 续期失败时都是直接退出程序,由 Kubernetes 保活机制重启程序,再重新加入竞选。
如 kube-controller-manager
是这样配置 leader 续期失败动作的:
kube-controller-manager 的 OnStoppedLeading1 2 3 4
| OnStoppedLeading: func() { klog.ErrorS(nil, "leaderelection lost") klog.FlushAndExit(klog.ExitFlushTimeout, 1) },
|
完整的 leaderelection
流程如图:
引用
- k8s 基于资源锁的选主分析
- 多实例 leader 选举