gofunc() { deferfunc() { if r := recover(); r != nil { panicCh <- r } }() // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first // list request will return the full response. pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { return r.listerWatcher.List(opts) })) switch { case r.WatchListPageSize != 0: pager.PageSize = r.WatchListPageSize case r.paginatedResult: case options.ResourceVersion != "" && options.ResourceVersion != "0": pager.PageSize = 0 }
当 client-go 请求 API Server 并带了 watch 参数时,API Server 在响应中头中会带有 Transfer-Encoding: chunked,表示使用分块传输编码(参考:staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go:164);
for { // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors select { case <-stopCh: returnnil default: }
// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent start := r.clock.Now()
if w == nil { timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) options := metav1.ListOptions{ ResourceVersion: r.LastSyncResourceVersion(), // We want to avoid situations of hanging watchers. Stop any watchers that do not // receive any events within the timeout window. TimeoutSeconds: &timeoutSeconds, // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks. // Reflector doesn't assume bookmarks are returned at all (if the server do not support // watch bookmarks, it will ignore this field). AllowWatchBookmarks: true, }
informers map[reflect.Type]cache.SharedIndexInformer // startedInformers is used for tracking which informers have been started. // This allows Start() to be called multiple times safely. startedInformers map[reflect.Type]bool }
type DeltaFIFO struct { // lock/cond protects access to 'items' and 'queue'. lock sync.RWMutex cond sync.Cond
// `items` maps a key to a Deltas. // Each such Deltas has at least one Delta. items map[string]Deltas
// `queue` maintains FIFO order of keys for consumption in Pop(). // There are no duplicates in `queue`. // A key is in `queue` if and only if it is in `items`. queue []string
// ...
// knownObjects list keys that are "known" --- affecting Delete(), // Replace(), and Resync() knownObjects KeyListerGetter
// DeltaType is the type of a change (addition, deletion, etc) type DeltaType string
// Delta is the type stored by a DeltaFIFO. It tells you what change // happened, and the object's state after* that change. type Delta struct { Type DeltaType Object interface{} }
// Deltas is a list of one or more 'Delta's to an individual object. // The oldest delta is at index 0, the newest delta is the last one. type Deltas []Delta
switch event.Type { case watch.Added: err := store.Add(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err)) } case watch.Modified: err := store.Update(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err)) } case watch.Deleted: // TODO: Will any consumers need access to the "last known // state", which is passed in event.Object? If so, may need // to change this. err := store.Delete(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err)) } case watch.Bookmark: // A `Bookmark` means watch has synced here, just update the resourceVersion default: utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event)) }
iflen(newDeltas) > 0 { if _, exists := f.items[id]; !exists { f.queue = append(f.queue, id) } f.items[id] = newDeltas // 广播通知消费 f.cond.Broadcast() } else { // 不会进入这里,如果 newDeltas 不为空,dedupDeltas 不会返回空列表 // This never happens, because dedupDeltas never returns an empty list // when given a non-empty list (as it is here). // If somehow it happens anyway, deal with it but complain. if oldDeltas == nil { klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj) returnnil } klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj) f.items[id] = newDeltas return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj) } returnnil }
func(c *controller) processLoop() { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == ErrFIFOClosed { return } if c.config.RetryOnError { // This is the safe way to re-enqueue. c.config.Queue.AddIfNotPresent(obj) } } } }
func(f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { forlen(f.queue) == 0 { // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the f.closed is set and the condition is broadcasted. // Which causes this loop to continue and return from the Pop(). if f.closed { returnnil, ErrFIFOClosed }
// 队列为空时,阻塞等待 f.cond.Wait() } isInInitialList := !f.hasSynced_locked() id := f.queue[0] f.queue = f.queue[1:] depth := len(f.queue) if f.initialPopulationCount > 0 { f.initialPopulationCount-- } item, ok := f.items[id] if !ok { // This should never happen klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id) continue } // 出队后删除缓存的数据 delete(f.items, id) // 调用跟踪日志 if depth > 10 { trace := utiltrace.New("DeltaFIFO Pop Process", utiltrace.Field{Key: "ID", Value: id}, utiltrace.Field{Key: "Depth", Value: depth}, utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"}) defer trace.LogIfLong(100 * time.Millisecond) } // 调用处理函数 err := process(item, isInInitialList) if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err } // Don't need to copyDeltas here, because we're transferring // ownership to the caller. return item, err } }
if deltas, ok := obj.(Deltas); ok { return processDeltas(s, s.indexer, s.transform, deltas, isInInitialList) } return errors.New("object given as Process argument is not Deltas") }
funcprocessDeltas( // Object which receives event notifications from the given deltas handler ResourceEventHandler, clientState Store, transformer TransformFunc, deltas Deltas, isInInitialList bool, )error { // from oldest to newest for _, d := range deltas { obj := d.Object if transformer != nil { var err error obj, err = transformer(obj) if err != nil { return err } }
func(s *sharedIndexInformer) OnAdd(obj interface{}, isInInitialList bool) { // Invocation of this function is locked under s.blockDeltas, so it is // save to distribute the notification s.cacheMutationDetector.AddObject(obj) s.processor.distribute(addNotification{newObj: obj, isInInitialList: isInInitialList}, false) }
for listener, isSyncing := range p.listeners { switch { case !sync: // non-sync messages are delivered to every listener listener.add(obj) case isSyncing: // sync messages are delivered to every syncing listener listener.add(obj) default: // skipping a sync obj for a non-syncing listener } } }
// Separate stop channel because Processor should be stopped strictly after controller processorStopCh := make(chanstruct{}) var wg wait.Group defer wg.Wait() // Wait for Processor to stop deferclose(processorStopCh) // Tell Processor to stop wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) wg.StartWithChannel(processorStopCh, s.processor.run) // 下略 }
这个 run 方法的功能主要就是启动所有 listener 的 pop 和 run 协程并等待退出消息,当收到退出消息后关闭这些协程:
func(p *processorListener) add(notification interface{}) { if a, ok := notification.(addNotification); ok && a.isInInitialList { p.syncTracker.Start() } p.addCh <- notification }
该方法直接把传入的 notification 发送到 p.addCh 管道,这个管道在创建时没有设置大小,没有缓存(无消费阻塞),所以 pop 方法必须尽快取走数据,不然会影响 DeltaFIFO 的消费。
// 获取 Indexer 所有 key,并传入 syncKeyLocked 处理 keys := f.knownObjects.ListKeys() for _, k := range keys { if err := f.syncKeyLocked(k); err != nil { return err } } returnnil }
func(f *DeltaFIFO) syncKeyLocked(key string) error { obj, exists, err := f.knownObjects.GetByKey(key) if err != nil { klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key) returnnil } elseif !exists { klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key) returnnil }
// `*cache` implements Indexer in terms of a ThreadSafeStore and an // associated KeyFunc. type cache struct { // cacheStorage bears the burden of thread safety for the cache cacheStorage ThreadSafeStore // keyFunc is used to make the key for objects stored in and retrieved from items, and // should be deterministic. keyFunc KeyFunc }