Kubernetes 核心组件 Leader 选举机制

Kubernetes 核心组件 Leader 选举机制

Kubernetes 内部很多核心组件是有状态的,都以一主多从多实例的方式运行。这些组件的一主多从中,只有主实例负责处理数据,从实例处在热备状态。当主实例异常时从实例将竞选成为主实例并接替进行任务处理,所以这个选举机制是 Kubernetes 对于这有状态组件高可用的保障。

核心组件如 kube-scheduler 或 kube-controller-manager 等组件,在同一时刻只有一个实例在处理业务逻辑,因此需要在启动的实例中进行选主,决定哪个实例负责处理任务。这些核心组件都是使用的 client-go 中提供的工具类 leaderelection,也就是本文的主角。

leaderelection 依赖于 Kubernetes 中提供的 EndpointsConfigMapLease 三种资源锁,leaderelection 选主的实现方式就是基于这三种资源锁:

  • 多个副本去创建资源,创建成功则获得锁成为 leader;
  • leader 在租约内去刷新锁;
  • 其他副本则通过比对锁的更新时间,判断是否竞争成为 leader。

除了能在核心组件中使用,这个组件也能使用在我们开发的应用中,前提是我们的应用运行在 Kubernetes 环境且有操作资源锁的权限。

启动选举

如果我们想使用 leaderelection 组件实现选主的功能,那么在开始选举先需要先通过方法 resourcelock.New 获取资源锁的对象。Kubernetes 虽然现在只有 endpoints/configmap/lease 几种资源锁,但他们之间可以组合使用。

可选的资源锁组合如下:

资源锁组合
1
2
3
4
5
6
7
const (
endpointsResourceLock = "endpoints"
configMapsResourceLock = "configmaps"
LeasesResourceLock = "leases"
EndpointsLeasesResourceLock = "endpointsleases"
ConfigMapsLeasesResourceLock = "configmapsleases"
)

另外,resourcelock.NewFromKubeconfig 方法也可以创建资源锁,该方法对 resourcelock.New 进行了封装。kube-controller-manager 就是使用这个方法创建的资源锁:

kube-controller-manager 创建资源锁
1
2
3
4
5
6
7
8
9
rl, err := resourcelock.NewFromKubeconfig(resourceLock,  
c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
leaseName,
resourcelock.ResourceLockConfig{
Identity: lockIdentity,
EventRecorder: c.EventRecorder,
},
c.Kubeconfig,
c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration)

在 kube-controller-manager 中,resourceLock 参数默认传的是 endpointsleases,所以会创建一个包含 endpointsLockLeaseLock 的组合锁 MultiLock,在操作锁的时候会先操作 endpointsLock,成功再操作 LeaseLock

准备好资源锁后,调用方法 leaderelection.RunOrDie 开始选举。

启动选举
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
// 资源锁类型
Lock: lock,
// 租约时长,非主候选者用来判断资源锁是否过期
LeaseDuration: 60 * time.Second,
// leader刷新资源锁超时时间
RenewDeadline: 15 * time.Second,
// 调用资源锁间隔
RetryPeriod: 5 * time.Second,
// 回调函数,根据选举不同事件触发
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
run(ctx)
},
OnStoppedLeading: func() {
klog.Infof("leader lost: %s", id)
os.Exit(0) // 通常要退出程序,重启后重新开始选主,否则将不会参与到选主中
},
OnNewLeader: func(identity string) {
if identity == id {
return
}
klog.Infof("new leader elected: %s", identity)
},
},
})

选举主流程

启动选举后,RunOrDie 方法会调用 le.Run(ctx) 方法开始真正的选举流程,该方法除非在以下情况下才会返回:

  • ctx 被取消(外部要求中止选举流程)
  • 当选了 Leader 后,任期结束(网络或某种原因导致续期失败)

其它情况,比如当前节点未曾当选 Leader 则会卡在 acquire 方法中持续竞选。

le.Run(ctx)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (le *LeaderElector) Run(ctx context.Context) {
defer runtime.HandleCrash()
defer func() {
// 当方法退出时回调通知任期结束
le.config.Callbacks.OnStoppedLeading()
}()

// 开始竞选
if !le.acquire(ctx) {
// ctx 被取消,中止选举
return // ctx signalled done
}
// 当选 leader
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// 通知执行 leader 任务
go le.config.Callbacks.OnStartedLeading(ctx)
// 执行 leader 续期
le.renew(ctx)
}

竞选

acquire
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (le *LeaderElector) acquire(ctx context.Context) bool {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// 默认 false,当未当选且 ctx 被取消时返回
succeeded := false
desc := le.config.Lock.Describe()
klog.Infof("attempting to acquire leader lease %v...", desc)
// 循环竞选
// 间隔 RetryPeriod 执行一次,直到 ctx.Done()
wait.JitterUntil(func() {
// 竞选
succeeded = le.tryAcquireOrRenew(ctx)
// leader 变化回调
le.maybeReportTransition()
// 竞选失败返回
if !succeeded {
klog.V(4).Infof("failed to acquire lease %v", desc)
return
}
// 成功则退出竞选函数
le.config.Lock.RecordEvent("became leader")
le.metrics.leaderOn(le.config.Name)
klog.Infof("successfully acquired lease %v", desc)
cancel()
}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
return succeeded
}

对于返回值,有以下情况:

  • succeeded 默认值是 false,在竞选循环中,外部通过 ctx 中止竞选时会中止 wait.JitterUntil 循环并返回 false
  • 如果竞选成功,则会改写 succeeded=true 并手动调用 cancel() 中止 wait.JitterUntil 循环。

所以 acquire 函数只会有两种情况会返回:

  • true:当选 leader;
  • false:外部中止竞选。

未当选的竞选者会在 wait.JitterUntil 循环中持续尝试。

抢锁和续期

抢锁操作和 Leader 续期都是在 tryAcquireOrRenew 方法中实现。

  • 如果当前节点未取得锁,会尝试获取锁;
  • 否则进行续期;
  • 成功返回 true,失败返回 false
tryAcquireOrRenew
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
now := metav1.Now()
// 使用默认值初始化记录对象,HolderIdentity 为当前竞选者标识
leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(),
LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
RenewTime: now,
AcquireTime: now,
}

// 获取锁记录
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
if err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
return false
}
// 如果锁不存在则尝试创建
if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
klog.Errorf("error initially creating leader election record: %v", err)
return false
}

// 创建成功则取锁成功,更新当前监控锁记录内容
le.setObservedRecord(&leaderElectionRecord)

return true
}

// 检查获取的数据是否更新,更新则刷新本地缓存
// 如果有变化,说明上次尝试获取锁到现在的间隔内 leader 变化了
if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
le.setObservedRecord(oldLeaderElectionRecord)

le.observedRawRecord = oldLeaderElectionRawRecord
}
// 检查是否被其它人持有,且任期未结束
// le.observedTime 由 le.setObservedRecord 方法更新,在 leader 发生变化时更新
if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
!le.IsLeader() {
klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false
}

// 使用正确数据填充锁记录
if le.IsLeader() {
// 如果当前是 leader,则是续期操作,使用记录里的取锁时间和变化次数
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
} else {
// 如果不是 leader 说明正在抢锁,将 leader 变化次数加 1
// AcquireTime 已经在上面默认设置成当前时间
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
}

// 尝试更新锁
if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
klog.Errorf("Failed to update lock: %v", err)
return false
}

// 更新锁成功则说明当前节点持有锁:抢锁成功/续期成功
le.setObservedRecord(&leaderElectionRecord)
return true
}

代码的流程如下:

tryAcquireOrRenew

任期的原理

代码中对是否在任期的判断是基于 le.observedTime 的,le.observedTime 是在 leader 发生变化时调用 le.setObservedRecord 方法更新的。更新时机:

  1. 锁不存在,创建锁成功时更新;
  2. 获取到锁记录和缓存的不同,说明上次尝试获取锁到现在的间隔内 leader 变化了,更新缓存;
  3. leader 超期没续期且当前节点抢锁成功,更新缓存。

相当于每次 le.observedTime 变化的时候都是监测到 leader 变化的时间,所以 le.observedTime + LeaseDuration 的时间就是 leader 当前任期的结束时间。

如果当前时间超过这个任期时间,但 leader 没及时刷新锁,就会导致获取到的锁记录 oldLeaderElectionRawRecord 和缓存的相同(无法满足更新时机 2),那么当前节点会走到后面抢锁的逻辑,执行锁更新的尝试。

抢锁的原理

进行锁更新尝试(抢锁)的原理是基于 kubernetes 的资源乐观锁实现的:

  • 获取锁方法 le.config.Lock.Get(ctx) 会取得当前锁的最新 resourceVersion 并保存;
  • 更新锁时提供保存的 resourceVersion
  • Kubernetes 对比 resourceVersion 和最新值,如果相等则允许更新,返回成功,否则更新失败。

当 leader 在任期内时,通常只有 leader 自己去更新锁;而在抢锁阶段会有多个节点尝试更新,但只有第一个到达 Kubernetes 的更新请求会被处理,其它节点请求到达的时候 resourceVersion 已经被更新过了,所以请求会被拒绝。

续期处理循环

在获取到锁成为 leader 后,会进入 le.renew(ctx) 方法进行定期续期操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (le *LeaderElector) renew(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// 定期续期,每 RetryPeriod 执行一次续期
wait.Until(func() {
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
defer timeoutCancel()
// 执行续期,直到成功或超时
err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
return le.tryAcquireOrRenew(timeoutCtx), nil
}, timeoutCtx.Done())

le.maybeReportTransition()
desc := le.config.Lock.Describe()
// 没报错说明续期成功
if err == nil {
klog.V(5).Infof("successfully renewed lease %v", desc)
return
}
// 超时错误,说明续期失败,当前不再是 leader,退出续期流程
le.config.Lock.RecordEvent("stopped leading")
le.metrics.leaderOff(le.config.Name)
klog.Infof("failed to renew lease %v: %v", desc, err)
cancel()
}, le.config.RetryPeriod, ctx.Done())

// if we hold the lease, give it up
if le.config.ReleaseOnCancel {
le.release()
}
}

renew 方法如果续期失败会直接返回,然后选主流程结束。其它主从竞选流程通常都会让当前节点重新成为备选节点并继续进行选主,而 leaderelection 是直接中止。

要想使用 leaderelection 实现重新竞选,需要自己再次调用 leaderelection.RunOrDie 重新开始或重启程序。

总结

leaderelection 流程中,只有 Leader 才是 Worker 节点,其它节点是热备节点,用于在 Leader 异常时及时接替工作。

kube-controller-manager 等 Kubernetes 内部组件在 leader 续期失败时都是直接退出程序,由 Kubernetes 保活机制重启程序,再重新加入竞选。

kube-controller-manager 是这样配置 leader 续期失败动作的:

kube-controller-manager 的 OnStoppedLeading
1
2
3
4
OnStoppedLeading: func() {
klog.ErrorS(nil, "leaderelection lost")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
},

完整的 leaderelection 流程如图:

完整流程

引用

  1. k8s 基于资源锁的选主分析
  2. 多实例 leader 选举
作者

Jakes Lee

发布于

2023-01-18

更新于

2023-02-15

许可协议

评论