Kubernetes 的 Informer 机制

Kubernetes 的 Informer 机制

在 Kubernetes 中,kube-apiserver 是整个集群的大脑和心脏,是控制集群的入口,所有模块都是通过其提供的 HTTP REST API 接口来操作集群的。

由于是所有模块的数据交互和通信的枢纽,大量组件直接通过 HTTP 请求 apiserver 带来的访问压力是非常大的。一但 apiserver 出现异常,整个集群就会受到影响,甚至崩溃。

所以尽可能降低 apiserver 的访问压力是很有必要的,Informer 机制就是 Kubernetes 解决这个问题的方案。Informer 本质就是 client-go 提供的一种本地缓存机制:

  • 通过在本地缓存一份准实时的 Kubernetes 资源数据,应用在查询时直接从本地查询;
  • 当资源变化时通过长连接将变更推送到本地 Informer 并更新本地缓存;
  • 变更缓存后,触发本地的处理函数执行相关业务逻辑。

通过 Informer 机制,大大降低了 Kubernetes 各个组件跟与 API Server 的通信压力,同时 ETCD 的查询压力也同样得到缓解。

Informer 机制架构设计

下面这张图来自官方文档《client-go under the hood》,展现了 client-go 中各组件的工作原理和与自定义 Controller 的交互流程。

图中的组件分为上下两部分,分别是 client-go 组件和自定义 Controller 组件。Informer 机制指的是就是这一整套 Controller 的交互流程。

client-go 组件

  • Reflector:指的是 cache 包中定义的 Reflector 类,用于监控 Kubernetes 资源变化,其功能由 ListAndWatch 函数实现。当 Reflector 接收到资源变更的事件,会获取到变更的对象并在函数 watchHandler 中放到 Delta Fifo 队列。
  • Delta FIFO:是一个 FIFO 的队列,用来缓存 Reflector 拉取到的变更事件和资源对象;
  • Informor:是流程中最重要的节点,是整个流程的桥梁,Informer 也是在 cache 包中定义的,其功能在 processLoop 函数中实现,负责:
    • 从 Delta FIFO 中 pop 出对象并更新到 Indexer 的 cache 中;
    • 调用自定义 Controller,传递该对象。
  • Indexer:指在 cache 包中定义的 Indexer 类,主要是在资源对象上提供了索引和本地缓存的功能。经典的使用场景是基于对象的 Labels 创建索引,Indexer 可以支持使用索引函数来维护索引,同时 Indexer 使用线程安全的 Data Store 来存储资源对象和对应的 Key。默认使用的是 cache 包里的 MetaNamespaceKeyFunc 函数来生成对象的 Key,格式如:<namespace>/<name>

自定义组件

上图中 Informer referenceIndexer reference 是指在自定义 Controller 中需要自己创建的 Informer 和 Indexer 的实例,用来与整个流程进行交互,需要根据需要的资源创建对应的实例。client-go 提供了 NewIndexerInformer 函数来创建 Informer 和 Indexer 实例,也可以使用 SharedInformerFactory工厂方法来创建实例。

每个资源都会对应一个 Informer,每个 Informer 都通过 Watch 创建一个长连接。如果一个资源创建了多个 Informer 无疑是非常浪费的,所以通常都使用 SharedInformerFactory 工厂方法来创建,这样每种资源都复用一个 Informer,从而降低开销。

Reflector

Reflector 用于监控 Kubernetes 资源变化,当资源发生变化时将资源对象更新到本地缓存 Delta FIFO 中,其主要功能由 ListAndWatch 函数实现,后面详细介绍,现在看一下 Reflector 的创建过程。

Reflector 创建

直接使用 NewReflectorNewReflectorWithOptions 就可以实例化 Reflector 对象,但常见的使用方式是使用 SharedInformerFactory 来创建。在使用 SharedInformerFactory 工厂方法创建 Informer 后,Informer 启动时会自动创建 Reflector。

SharedInformerFactory 需要使用 Start 方法启动 Informer,Start 执行时会运行所有 Informer 的 Run 方法:

staging/src/k8s.io/client-go/informers/factory.go:133
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()

if f.shuttingDown {
return
}

for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
f.wg.Add(1)
informer := informer
go func() {
defer f.wg.Done()
// 启动 informer
informer.Run(stopCh)
}()
f.startedInformers[informerType] = true
}
}
}

sharedIndexInformerinformer) 的 Run 方法中会构造一个 Controller 对象,该对象的 Run 函数使用 NewReflectorWithOptions 函数构造了一个 Reflector 对象。

staging/src/k8s.io/client-go/tools/cache/controller.go:129
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflectorWithOptions(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
ReflectorOptions{
ResyncPeriod: c.config.FullResyncPeriod,
TypeDescription: c.config.ObjectDescription,
},
)
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
r.clock = c.clock
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}

c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()

var wg wait.Group

// 启动 Reflector
wg.StartWithChannel(stopCh, r.Run)

wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}

上面在创建 Reflector 后也会调用 Run 方法启动 Reflector 的处理流程。

staging/src/k8s.io/client-go/tools/cache/reflector.go:272
1
2
3
4
5
6
7
8
9
10
11
// Run repeatedly uses the reflector's ListAndWatch to fetch all the// objects and subsequent deltas.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
}

核心逻辑在 ListAndWatch 函数中,主要功能包括:

  • 拉取全量数据,初始化 Delta FIFO 数据;
  • 启动 Resync 机制
  • 监控资源变更。

Resync 机制后面 Delta FIFO 部分再说,这里先看看其它两部分。

拉取数据

启动时,首次是直接拉取全量数据的,完整的实现在 r.list(stopCh) 调用的函数中,该函数主要流程如下:

  • r.listerWatcher.List(opts):调用拉取全量数据
  • meta.ExtractList(list)runtime.Object 转换成 []runtime.Object 列表
  • r.syncWith(items, resourceVersion):同步数据到 Delta FIFO 队列中
  • r.setLastSyncResourceVersion(resourceVersion):刷新版本号

拉取全量数据时,为避免给服务器造成太大压力,首先使用的是分页方式分片拉取:

staging/src/k8s.io/client-go/tools/cache/reflector.go:420
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
go func() {
defer func() {
if r := recover(); r != nil {
panicCh <- r
}
}()
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
// list request will return the full response.
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
}))
switch {
case r.WatchListPageSize != 0:
pager.PageSize = r.WatchListPageSize
case r.paginatedResult:
case options.ResourceVersion != "" && options.ResourceVersion != "0":
pager.PageSize = 0
}

list, paginatedResult, err = pager.List(context.Background(), options)
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
r.setIsLastSyncResourceVersionUnavailable(true)
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
}
close(listCh)
}()

最终调用的是 r.listerWatcher.List(opts) 方法来拉取数据,这个方法会基于 ResourceVersion 获取指定资源下所有对象。比如 Pod 最终调用的是 Pod Informer 的 ListFunc 方法,通过 client-go 向 Kubernetes 发起 API 请求来获取资源数据。

监控资源变更

ListAndWatch 函数的最后一部分逻辑是监控资源变化的,原理是通过 HTTP 与 APIServer 建立长连接,基于 HTTP 协议的分块传输编码(chunked)实现:

  • 当 client-go 请求 API Server 并带了 watch 参数时,API Server 在响应中头中会带有 Transfer-Encoding: chunked,表示使用分块传输编码(参考staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go:164);
  • client-go 通过创建 StreamWatcher 的方式创建一个管道,监听新的数据并传回(参考staging/src/k8s.io/client-go/rest/request.go:765)。

Reflector 里,Watch 管道通过调用 r.listerWatcher.Watch(options) 方法创建,这个方法最终由 Informer 的 WatchFunc 实现,如 Pod 的 Informer 是这样实现的:

staging/src/k8s.io/client-go/informers/core/v1/pod.go
1
2
3
4
5
6
7
8
9
10
11
12
13
func NewFilteredPodInformer(...) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
// ...
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).Watch(options)
},
},
// ...
)
}

可以看出最终还是由 client-go 封装好的方法实现具体的功能,上面提过,client-go 内部会创建一个 StreamWatcher 对象返回,后面可以通过 w.ResultChan() 管道获取数据。

获取到 StreamWatcher 对象后,调用 watchHandler 进入监控处理逻辑。当有数据通过 w.ResultChan() 管道传递过来时,根据不同的事件类型调用 Delta FIFO 的不同方法更新缓存。

新的数据会带来新的 resourceVersion,处理完数据对应的事件后会通过 setLastSyncResourceVersion(resourceVersion) 方法更新当前 Watch 的数据版本。当网络原因等导致 watch 的长连接中断后,会基于本地数据版本的 resourceVersion 重新建立 watch 连接。

staging/src/k8s.io/client-go/tools/cache/reflector.go:405
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error {
var err error
retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)

for {
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
select {
case <-stopCh:
return nil
default:
}

// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
start := r.clock.Now()

if w == nil {
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options := metav1.ListOptions{
ResourceVersion: r.LastSyncResourceVersion(),
// We want to avoid situations of hanging watchers. Stop any watchers that do not
// receive any events within the timeout window.
TimeoutSeconds: &timeoutSeconds,
// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
// Reflector doesn't assume bookmarks are returned at all (if the server do not support
// watch bookmarks, it will ignore this field).
AllowWatchBookmarks: true,
}

w, err = r.listerWatcher.Watch(options)
// ... 略
}

watchHandler() 方法中,更新同步 resourceVersion 的代码片断如下:

staging/src/k8s.io/client-go/tools/cache/reflector.go:741
1
2
3
4
5
6
7
8
9
10
11
12
13
resourceVersion := meta.GetResourceVersion()
switch event.Type {
case watch.Added:
err := store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
}
// ... 略
}
setLastSyncResourceVersion(resourceVersion)
if rvu, ok := store.(ResourceVersionUpdater); ok {
rvu.UpdateResourceVersion(resourceVersion)
}

Informer

Informer 是流程中最重要的节点,是整个流程的桥梁,这也是为什么常把这个机制叫 Informer 的原因。

使用 Informer 可以参考官方自定义 Controller 的例子,这里把 Informer 操作提取出来:

Informer Example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
clientset, err := kubernetes.NewForConfig(config)
stopCh := make(chan struct{})
defer close(stopch)

informerFactory := informers.NewSharedInformerFactory(clientset, time.Minute)

informer := informerFactory.Core().V1().Pods().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {

},
UpdateFunc: func(oldObj, newObj interface{}) {

},
DeleteFunc: func(obj interface{}) {

},
})

informerFactory.Start(stopCh)

资源的 Informer

在上面的 Informer 操作例子中可以看到,创建 Pod 对象的 Informer 时使用的是 Core().V1().Pods().Informer() 这样的调用,返回 cache.SharedIndexInformer 接口的实例。

Pod 资源的核心逻辑都集中在 SharedIndexInformer 实例构造方法中:

staging/src/k8s.io/client-go/informers/core/v1/pod.go:58
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
},
},
&corev1.Pod{},
resyncPeriod,
indexers,
)
}

SharedIndexInformer 的构造使用的是通用的构造工厂方法 cache.NewSharedIndexInformer,可自定义内容只有:

  • 提供如何拉取数据和创建监控资源变化的方法(ListFuncWatchFunc 会提供给 Reflector 进行调用)
  • Informer 的资源对象
  • Indexers 和 resync 间隔时间

得益于这样优秀的封装,client-go 使用 informer-gen 生成了所有 Kubernetes 资源的 Informer 代码。

Informer 共享机制

Reflector 创建那一小节就提到过,开发 Controller 通常都是使用 SharedInformerFactory 来创建 Informer 的。在 SharedInformerFactory 中,同类型的资源会共享一个 Reflector,获取 Informer 时会先在内部的缓存里查询,如果不存在对应的 Informer 才会创建一个新的,否则复用缓存的。

SharedInformerFactory 缓存 Informer 的数据结构如下:

staging/src/k8s.io/client-go/informers/factory.go
1
2
3
4
5
6
7
8
type sharedInformerFactory struct {
// ...

informers map[reflect.Type]cache.SharedIndexInformer
// startedInformers is used for tracking which informers have been started.
// This allows Start() to be called multiple times safely.
startedInformers map[reflect.Type]bool
}

当调用 informerFactory.Core().V1().Pods().Informer() 获取 Informer 实例时,最终调用的是 podInformer 结构的方法:

staging/src/k8s.io/client-go/informers/core/v1/pod.go:84
1
2
3
4
5
6
7
func (f *podInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}

func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

可以看出,Informer() 是调用 SharedInformerFactoryInformerFor 方法来创建创建 SharedIndexInformer 实例的,这个方法最终通过 f.defaultInformer 调用的 NewFilteredPodInformer 来实现。

具体看一下 InformerFor 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()

// 检查是否已存在 Informer
informerType := reflect.TypeOf(obj)
informer, exists := f.informers[informerType]
if exists {
return informer
}

// 是否自定义同步时间间隔
resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}

// 构建新的 Informer
informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer

return informer
}

InformerFor 函数的实现很简单,和我们常写的单例工厂方法差不太多。

DeltaFIFO

前面多次提到了 DeltaFIFO,其实这是个缓冲队列,用来保存从 Reflector 拉取来的数据,在存入 DeltaFIFO 时会转换成操作事件对象。

DeltaFIFO 的数据结构:

staging/src/k8s.io/client-go/tools/cache/delta_fifo.go:97
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type DeltaFIFO struct {
// lock/cond protects access to 'items' and 'queue'.
lock sync.RWMutex
cond sync.Cond

// `items` maps a key to a Deltas.
// Each such Deltas has at least one Delta.
items map[string]Deltas

// `queue` maintains FIFO order of keys for consumption in Pop().
// There are no duplicates in `queue`.
// A key is in `queue` if and only if it is in `items`.
queue []string

// ...

// knownObjects list keys that are "known" --- affecting Delete(),
// Replace(), and Resync()
knownObjects KeyListerGetter

// ...
}

DeltaFIFO 保存了对资源对象的操作,如对对象的 AddedUpdatedDeletedSync 等操作,这个对象叫 Delta,结构如下:

staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
1
2
3
4
5
6
7
8
9
10
11
12
13
// DeltaType is the type of a change (addition, deletion, etc)
type DeltaType string

// Delta is the type stored by a DeltaFIFO. It tells you what change
// happened, and the object's state after* that change.
type Delta struct {
Type DeltaType
Object interface{}
}

// Deltas is a list of one or more 'Delta's to an individual object.
// The oldest delta is at index 0, the newest delta is the last one.
type Deltas []Delta

DeltaFIFO 结构中:

  • queue 字段存储资源的 key,由 KeyOf 函数计算得到;
  • items 字段存储的 Deltas 数组,是具体的资源事件内容。

存储结构示意如图:

DeltaFIFO 结构

生产者逻辑

Reflector 调用 AddUpdateReplace 等方法,往 DeltaFIFO 队列中增加数据。在 Reflector 的 watchHandler 方法中有如下代码:

staging/src/k8s.io/client-go/tools/cache/reflector.go:575
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
switch event.Type {
case watch.Added:
err := store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
}
case watch.Modified:
err := store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))
}
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
err := store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))
}
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
}

这段代码就是根据监控到的数据变化类型的不同,调用不同的 DeltaFIFO 方法,最后都是调用的 queueActionLocked 这个方法将变更入队。

staging/src/k8s.io/client-go/tools/cache/delta_fifo.go:413
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
// 计算 obj 的 key
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
oldDeltas := f.items[id]
newDeltas := append(oldDeltas, Delta{actionType, obj})
// 去重
newDeltas = dedupDeltas(newDeltas)

if len(newDeltas) > 0 {
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
// 广播通知消费
f.cond.Broadcast()
} else {
// 不会进入这里,如果 newDeltas 不为空,dedupDeltas 不会返回空列表
// This never happens, because dedupDeltas never returns an empty list
// when given a non-empty list (as it is here).
// If somehow it happens anyway, deal with it but complain.
if oldDeltas == nil {
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
return nil
}
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
f.items[id] = newDeltas
return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
}
return nil
}

在成功入队后会通过 f.cond.Broadcast() 广播通知消费者进行消费。

消费者逻辑

消费者是通过 Pop 方法进行消费的,在 Informer 启动的 controller 中,使用 wait.Until 启动了一个协程去消费 DeltaFIFO 的数据:

staging/src/k8s.io/client-go/tools/cache/controller.go:129
1
2
3
4
5
6
7
8
9
10
11
12
13
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()

// 这里省略了 reflector 启动代码

// 启动 DeltaFIFO 消费协程
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}

processLoop 函数使用一个无限循环去消费 DeltaFIFO 队列里的数据:

staging/src/k8s.io/client-go/tools/cache/controller.go:186
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}

Pop 方法需要传入一个 PopProcessFunc 类型的处理函数,当数据成功出队后会调用这个函数处理数据。这里的 c.config.Process 是在 Informer 中定义的 s.HandleDeltas 方法,在 controller 构造时传入的。

HandleDeltas 调用 processDeltas 方法进行处理,这个方法做两件事:

  • 更新 Indexer 缓存;
  • 触发注册的 Handler 处理事件。

先看看 DeltaFIFO 出队方法 Pop 是怎么实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.closed {
return nil, ErrFIFOClosed
}

// 队列为空时,阻塞等待
f.cond.Wait()
}
isInInitialList := !f.hasSynced_locked()
id := f.queue[0]
f.queue = f.queue[1:]
depth := len(f.queue)
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// This should never happen
klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
continue
}
// 出队后删除缓存的数据
delete(f.items, id)
// 调用跟踪日志
if depth > 10 {
trace := utiltrace.New("DeltaFIFO Pop Process",
utiltrace.Field{Key: "ID", Value: id},
utiltrace.Field{Key: "Depth", Value: depth},
utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
defer trace.LogIfLong(100 * time.Millisecond)
}
// 调用处理函数
err := process(item, isInInitialList)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
}

当 DeltaFIFO 队列不为空时,取出 f.queue 第一个元素,并调用消费者函数进行处理,如果处理函数返回 ErrRequeue 错误会重新入队。

Pop 返回后,processLoop 会再次进行判断,如果开启了 c.config.RetryOnError 功能也会重新入队进行重试。

HandleDeltas 实现只是简单对 obj 进行了类型检查,处理逻辑在 processDeltas 方法中:

staging/src/k8s.io/client-go/tools/cache/shared_informer.go:635
1
2
3
4
5
6
7
8
9
func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()

if deltas, ok := obj.(Deltas); ok {
return processDeltas(s, s.indexer, s.transform, deltas, isInInitialList)
}
return errors.New("object given as Process argument is not Deltas")
}

在处理前,对 blockDeltas 锁进行了加锁操作,这个锁的用处是保证当注册新的 Event Handler 时能暂停事件分发操作,避免并发问题。AddEventHandler 等方法注册事件处理器时也会加这个锁。

processDeltas 方法对 deltas 进行遍历更新缓存和触发事件:

staging/src/k8s.io/client-go/tools/cache/controller.go:447
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func processDeltas(
// Object which receives event notifications from the given deltas
handler ResourceEventHandler,
clientState Store,
transformer TransformFunc,
deltas Deltas,
isInInitialList bool,
) error {
// from oldest to newest
for _, d := range deltas {
obj := d.Object
if transformer != nil {
var err error
obj, err = transformer(obj)
if err != nil {
return err
}
}

switch d.Type {
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(obj); err == nil && exists {
// 更新 Indexer
if err := clientState.Update(obj); err != nil {
return err
}
// 触发 event handler 的 OnUpdate 方法
handler.OnUpdate(old, obj)
} else {
if err := clientState.Add(obj); err != nil {
return err
}
handler.OnAdd(obj, isInInitialList)
}
case Deleted:
if err := clientState.Delete(obj); err != nil {
return err
}
handler.OnDelete(obj)
}
}
return nil
}

代码中的 handler 对应的对象是 Informer 对象,这个 Informer 对象也实现了 ResourceEventHandler 接口,并将调用代理给内部的 processor

staging/src/k8s.io/client-go/tools/cache/shared_informer.go:646
1
2
3
4
5
6
func (s *sharedIndexInformer) OnAdd(obj interface{}, isInInitialList bool) {
// Invocation of this function is locked under s.blockDeltas, so it is
// save to distribute the notification
s.cacheMutationDetector.AddObject(obj)
s.processor.distribute(addNotification{newObj: obj, isInInitialList: isInInitialList}, false)
}

Processor 逻辑

前面代码中 Informer事件的响应处理是将事件代理给 processor 处理, processor 是维护的是之前注册到 Informer 中 Event Handler 的,distribute 的逻辑是将事件转发给这些方法进行处理:

staging/src/k8s.io/client-go/tools/cache/shared_informer.go:776
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()

for listener, isSyncing := range p.listeners {
switch {
case !sync:
// non-sync messages are delivered to every listener
listener.add(obj)
case isSyncing:
// sync messages are delivered to every syncing listener
listener.add(obj)
default:
// skipping a sync obj for a non-syncing listener
}
}
}

distribute 代码中的 listenerprocessorListener 类型的对象,这个对象是在向 Informer 注册 Event Handler 时调用的 AddEventHandlerWithResyncPeriod 方法中创建的:

1
2
3
4
5
6
7
8
9
10
11
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) {
// 上面代码略

listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced)

if !s.started {
return s.processor.addListener(listener), nil
}

// 下面代码略
}

可以看出,一个 handler 对应一个 listener,但是在调 listener.add(obj) 时并不是直接调用 handler 的,这里面另有玄机。

在 Informer 的启动代码中启动了一个协程 s.processor.run,下面是 Informer 启动代码:

staging/src/k8s.io/client-go/tools/cache/shared_informer.go:458
1
2
3
4
5
6
7
8
9
10
11
12
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
// 上略

// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)
// 下略
}

这个 run 方法的功能主要就是启动所有 listenerpoprun 协程并等待退出消息,当收到退出消息后关闭这些协程:

staging/src/k8s.io/client-go/tools/cache/shared_informer.go:794
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
<-stopCh

p.listenersLock.Lock()
defer p.listenersLock.Unlock()
for listener := range p.listeners {
// 关闭 pop
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
}

// Wipe out list of listeners since they are now closed
// (processorListener cannot be re-used)
p.listeners = nil

// Reset to false since no listeners are running
p.listenersStarted = false

// 等待 pop 和 run 关闭
p.wg.Wait() // Wait for all .pop() and .run() to stop
}

s.processor.run 启动了两个协程:

  • pop:从 p.addCh 管道里取出推送的事件,经过一个 Ring Buffer 缓冲,再通过 p.nextCh 管道传递给 run 协程,p.addCh 管道关闭时关闭 p.nextCh 管道(通知 run 退出);
  • run:从 p.nextCh 管道中取出数据,调用注册的 handler 处理,当 p.nextCh 管道关闭时,退出协程。
staging/src/k8s.io/client-go/tools/cache/shared_informer.go:933
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop

var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
// 上个 notification 发送成功
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // 缓冲中无待发数据
nextCh = nil // 禁用当前分支
}
case notificationToAdd, ok := <-p.addCh:
// 当 p.addCh 关闭时退出方法
if !ok {
return
}
// 若当前无数据待发送且 pendingNotifications 缓冲无数据
if notification == nil {
// 直接将获取到的新数据设置为待发送
// 无需加到缓冲中
// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
nextCh = p.nextCh
} else {
// 已经有 notification 待发送,放入缓冲
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}

func (p *processorListener) run() {
stopCh := make(chan struct{})
wait.Until(func() {
// 从 p.nextCh 获取事件并调用
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj, notification.isInInitialList)
if notification.isInInitialList {
p.syncTracker.Finished()
}
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// p.nextCh 关闭时退出
close(stopCh)
}, 1*time.Second, stopCh)
}

pop 方法使用 select 多分支并用 Ring Buffer 的好外是,当 run 协程来不及处理时,新来的数据可以进入第二分支,将数据放到 pendingNotifications 中缓存。可以保证 processor 能一直接受新数据。

现在回头看消费 DeltaFIFO 时调用的 distribute 方法里,最终调用的 listener.add(obj) 是怎么实现的:

staging/src/k8s.io/client-go/tools/cache/shared_informer.go:926
1
2
3
4
5
6
func (p *processorListener) add(notification interface{}) {
if a, ok := notification.(addNotification); ok && a.isInInitialList {
p.syncTracker.Start()
}
p.addCh <- notification
}

该方法直接把传入的 notification 发送到 p.addCh 管道,这个管道在创建时没有设置大小,没有缓存(无消费阻塞),所以 pop 方法必须尽快取走数据,不然会影响 DeltaFIFO 的消费。

pop 和 run 处理流程

Resync 机制

在 Reflector 主流程 ListAndWatch 中增提过 DeltaFIFO 的 Resync 机制,Resync 机制存在的作用是让处理失败的事件有重新处理的机会。

在处理 Informer 事件回调时,可能存在处理失败的情况,且由前面讨论过的 run 流程可知,报错的事件会被跳过,并不会重试报错的事件。

Resync 机制会定期将 Indexer 中的缓存同步到 DeltaFIFO 中,重新走一遍消费流程,不过与之前不同的是 Resync 的数据的事件类型是 Sync

staging/src/k8s.io/client-go/tools/cache/delta_fifo.go:666
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (f *DeltaFIFO) Resync() error {
f.lock.Lock()
defer f.lock.Unlock()

if f.knownObjects == nil {
return nil
}

// 获取 Indexer 所有 key,并传入 syncKeyLocked 处理
keys := f.knownObjects.ListKeys()
for _, k := range keys {
if err := f.syncKeyLocked(k); err != nil {
return err
}
}
return nil
}

func (f *DeltaFIFO) syncKeyLocked(key string) error {
obj, exists, err := f.knownObjects.GetByKey(key)
if err != nil {
klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
return nil
} else if !exists {
klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
return nil
}

// 如果 DeltaFIFO 中已经存在同样 Key 的数据,说明有新 event,忽略
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
if len(f.items[id]) > 0 {
return nil
}

// 入队操作
if err := f.queueActionLocked(Sync, obj); err != nil {
return fmt.Errorf("couldn't queue object: %v", err)
}
return nil
}

通过 Resync 机制产生的事件会以 updateNotification 的形式发送,最终触发 onUpdate 事件回调。

Indexer

Indexer 是 client-go 里存储 Kubernetes 资源的本地缓存,当创建资源的 Informer,对应资源的全量数据都会缓存在对应的 Indexer 中并通过 Reflector 监听变更同步更新。client-go 查询时就可以优先查询本地缓存,降低 Kubernetes APIServer 和 ETCD 的压力。

Indexer 内部基于 ThreadSafeMap 实现:

staging/src/k8s.io/client-go/tools/cache/store.go:139
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// `*cache` implements Indexer in terms of a ThreadSafeStore and an
// associated KeyFunc.
type cache struct {
// cacheStorage bears the burden of thread safety for the cache
cacheStorage ThreadSafeStore
// keyFunc is used to make the key for objects stored in and retrieved from items, and
// should be deterministic.
keyFunc KeyFunc
}

func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}
}

NewThreadSafeStore 使用的 threadSafeMap 来创建:

1
2
3
4
5
6
7
8
9
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
return &threadSafeMap{
items: map[string]interface{}{},
index: &storeIndex{
indexers: indexers,
indices: indices,
},
}
}

threadSafeMap

threadSafeMap 是线程安全的 Map,类似于 Go 的 sync.Map,只是 Kubernetes 开发时还没有 sync.Map

threadSafeMap 实现了 ThreadSafeStore 接口,而 Indexer 接口和 ThreadSafeStore 接口是一样的,Indexer 是对 threadSafeMap 的封装,增加了 keyFunc 的功能。threadSafeMap 结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}

// index implements the indexing functionality
index *storeIndex
}

type storeIndex struct {
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}

type Index map[string]sets.String

// Indexers maps a name to an IndexFunc
type Indexers map[string]IndexFunc

// Indices maps a name to an Index
type Indices map[string]Index

threadSafeMap 中各字段的作用:

  • items 用于存储缓存的数据,key -> 资源对象
  • indexers 存储的是索引生成函数的名字和函数引用,索引生成函数名字 -> 函数
  • indices 是存储索引生成函数名字和用这个函数生成的索引数据,生成函数名字 -> 索引数据
  • Index 存储的是索引数据,索引(索引生成函数产生) -> key 列表

threadSafeMap

SharedIndexInformer 中,默认使用 DeletionHandlingMetaNamespaceKeyFunc 生成 items <namespace>/<name> 格式的 key。

Indexer 索引器

从前面 NewIndexer 函数可以看出,Indexer 支持自定义索引函数。在之前讨论Informer 共享机制里,Pod Informer 创建时调用的 defaultInformer 方法使用了一个默认的索引函数:

defaultInformer 中创建 Indexer
1
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}

该索引函数使用资源的 namespace 创建索引,返回一个资源对象的索引列表,依照这个实现实现一个自定义的:

使用 Indexer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func UidIndexFunc(obj interfaces{}) ([]string, error) {
pod := obj.(*v1.Pod)
uid := pod.Annotations["uid"]
return []string{uid}, nil
}

func main() {
index := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"userId": UidIndexFunc})
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test", Annotations: map[string]string{"uid": "id1"}}}

index.Add(pod)

pods, err := index.ByIndex("userId", "id1")
}

上面代码中展示的是独立使用 Indexer 的情况,若想与 SharedInformerFactory 结合使用,可以调用 SharedIndexInformerAddIndexers 方法,下面代码来自 external-provisioner 的用法:

vendor/sigs.k8s.io/sig-storage-lib-external-provisioner/v8/controller/controller.go:687
1
2
3
4
5
6
7
8
9
10
11
12
13
if controller.claimInformer != nil {
controller.claimInformer.AddEventHandlerWithResyncPeriod(claimHandler, controller.resyncPeriod)
} else {
controller.claimInformer = informer.Core().V1().PersistentVolumeClaims().Informer()
controller.claimInformer.AddEventHandler(claimHandler)
}
err = controller.claimInformer.AddIndexers(cache.Indexers{uidIndex: func(obj interface{}) ([]string, error) {
uid, err := getObjectUID(obj)
if err != nil {
return nil, err
}
return []string{uid}, nil
}})

Indexer 索引查询

查询数据可以使用 Indexer 的 Get 方法,更常用的是使用 ByIndex 方法,能与自定义的索引函数结合使用,如 external-provisioner 里获取数据的方法:

vendor/sigs.k8s.io/sig-storage-lib-external-provisioner/v8/controller/controller.go:1015
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (ctrl *ProvisionController) syncClaimHandler(ctx context.Context, key string) error {
// 使用自定义的索引函数获取数据
objs, err := ctrl.claimsIndexer.ByIndex(uidIndex, key)
if err != nil {
return err
}
var claimObj interface{}
if len(objs) > 0 {
claimObj = objs[0] // 基于业务理解,通常一个 uid 只有一个对象
} else {
obj, found := ctrl.claimsInProgress.Load(key)
if !found {
utilruntime.HandleError(fmt.Errorf("claim %q in work queue no longer exists", key))
return nil
}
claimObj = obj
}
return ctrl.syncClaim(ctx, claimObj)
}

ByIndex 方法接收两个参数:

  • indexName:索引函数名;
  • indexedValue:索引值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock()

// 查询索引函数下,该索引值对应的 key 列表
set, err := c.index.getKeysByIndex(indexName, indexedValue)
if err != nil {
return nil, err
}

// key 列表查原始资源对象
list := make([]interface{}, 0, set.Len())
for key := range set {
list = append(list, c.items[key])
}

return list, nil
}

func (i *storeIndex) getKeysByIndex(indexName, indexedValue string) (sets.String, error) {
// 查询索引函数名是否存在
indexFunc := i.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
// 从索引函数对应的 索引 中取出指定 索引值关联的 key 列表
index := i.indices[indexName]
return index[indexedValue], nil
}

方法的逻辑:

  • 检查给定的索引函数名是否存在;
  • 取出索引函数名对应的索引数据;
  • 从索引数据中取出给定索引值所关联的 key 列表;
  • 使用 key 列表从 items 中获取资源对象列表并返回。

总结

本文从 Informer 机制的设计原因和整体架构开始,深入讨论了 Informer 机制,了解了支撑 Informer 机制实现的 Reflector、Informer、DeltaFIFO 和 Indexer 几个内部组件的实现原理。

作为 client-go 重要的组成部分,基本上 Kubernetes 自定义组件都离不开 Informer,Kubernetes 之所以设计这样一个结构,核心需求是为了减少 Kubernetes API Server 和 ETCD 的压力,增强整个集群的稳定性。

整体流程
代码调用关系

引用

  1. 深入了解 Kubernetes Informer
  2. P2-Controller 与 informer
  3. Informer 中为什么需要引入 Resync 机制?
  4. client-go under the hood

修订历史

  • 2023-05-16:细化补充 ReflectorWatch 相关逻辑,更新流程图增加 Informer Lister 入口说明。
作者

Jakes Lee

发布于

2023-02-15

更新于

2023-05-16

许可协议

评论