3

9. kubebuilder 进阶: 源码分析

 3 years ago
source link: https://lailin.xyz/post/operator-09-kubebuilder-code.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
9. kubebuilder 进阶: 源码分析

注:本文所有示例代码都可以在 blog-code 仓库中找到

在前面的文章当中我们已经完整的完成了一个 Operator 的开发,涉及到了 CURD、预删除、Status、Event、OwnerReference、WebHook,也算是将一个 Operator 开发中会涉及到的点大部分都了解了一下。kubebuilder 帮我们做了很多事情,让我们的开发基本上只需要关注一个 Reconcile 函数就可以了,但是从另外一个方面来讲,kubebuilder 目前对我们来说它还是一个黑盒,会产生很多的疑问:

  • Reconcile 方法是怎么被触发的?
  • 怎么识别到不同的资源?
  • 整体是如何进行工作的?

我们先来看一下来自官方文档的这个架构图[1]

  • Process 进程通过 main.go启动,一般来说一个 Controller 只有一个进程,如果做了高可用的话,会有多个
  • Manager 每个进程会有一个 Manager,这是核心组件,主要负责
    • metrics 的暴露
    • webhook 证书
    • 初始化共享的 cache
    • 初始化共享的 clients 用于和 APIServer 进行通信
    • 所有的 Controller 的运行
  • Client 一般来说,我们 创建、更新、删除某个资源的时候会直接调用 Client 和 APIServer 进行通信
  • Cache 负责同步 Controller 关心的资源,其核心是 GVK -> Informer 的映射,一般我们的 Get 和 List 操作都会从 Cache 中获取数据
  • Controller 控制器的业务逻辑所在的地方,一个 Manager 可能会有多个 Controller,我们一般只需要实现 Reconcile 方法就行。图上的 Predicate 是事件过滤器,我们可以在 Controller 中过滤掉我们不关心的事件信息
  • WebHook 就是我们准入控制实现的地方了,主要是有两类接口,一个是 MutatingAdmissionWebhook 需要实现 Defaulter 接口,一个是 ValidatingAdmissionWebhook 需要实现 Validator 接口

了解了基本的架构之后,我们就从入口 main.go 开始,看一看 kubebuilder 究竟在后面偷偷的做了哪些事情吧。

main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
 // 省略了参数绑定和 error check 的代码
func main() {
var metricsAddr string
var enableLeaderElection bool
var probeAddr string

ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "97acaccf.lailin.xyz",
// CertDir: "config/cert/", // 手动指定证书位置用于测试
})


(&controllers.NodePoolReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("NodePool"),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("NodePool"),
}).SetupWithManager(mgr)

(&nodesv1.NodePool{}).SetupWebhookWithManager(mgr)

//+kubebuilder:scaffold:builder

mgr.AddHealthzCheck("healthz", healthz.Ping)
mgr.AddReadyzCheck("readyz", healthz.Ping)

setupLog.Info("starting manager")
mgr.Start(ctrl.SetupSignalHandler())
}

可以看到 main.go 主要是做了一些启动的工作包括:

  • 创建一个 Manager
  • 使用刚刚创建的 Manager 创建了一个 Controller
  • 启动 WebHook
  • 添加健康检查
  • 启动 Manager

下面我们就顺着 main 函数里面的逻辑一步步的往下看看

NewManger

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// New returns a new Manager for creating Controllers.
func New(config *rest.Config, options Options) (Manager, error) {
// 省略配置初始化相关代码

// 创建 cache
cache, err := options.NewCache(config,
cache.Options{
Scheme: options.Scheme, // main 中传入的 scheme
Mapper: mapper, // k8s api 和 go type 的转换器
Resync: options.SyncPeriod, // 默认 10 小时,一般不要改
Namespace: options.Namespace, // 需要监听的 namespace
})

// 创建和 APIServer 交互的 client,读写分离
clientOptions := client.Options{Scheme: options.Scheme, Mapper: mapper}
apiReader, err := client.New(config, clientOptions)


writeObj, err := options.ClientBuilder.
WithUncached(options.ClientDisableCacheFor...).
Build(cache, config, clientOptions)

if options.DryRunClient {
writeObj = client.NewDryRunClient(writeObj)
}

// 创建事件记录器
recorderProvider, err := options.newRecorderProvider(config, options.Scheme, options.Logger.WithName("events"), options.makeBroadcaster)

// 需要需要高可用的话,创建选举相关的配置
leaderConfig := config
if options.LeaderElectionConfig != nil {
leaderConfig = options.LeaderElectionConfig
}
resourceLock, err := options.newResourceLock(leaderConfig, recorderProvider, leaderelection.Options{
LeaderElection: options.LeaderElection,
LeaderElectionResourceLock: options.LeaderElectionResourceLock,
LeaderElectionID: options.LeaderElectionID,
LeaderElectionNamespace: options.LeaderElectionNamespace,
})

// 创建 metric 和 健康检查的接口
metricsListener, err := options.newMetricsListener(options.MetricsBindAddress)

// By default we have no extra endpoints to expose on metrics http server.
metricsExtraHandlers := make(map[string]http.Handler)

// Create health probes listener. This will throw an error if the bind
// address is invalid or already in use.
healthProbeListener, err := options.newHealthProbeListener(options.HealthProbeBindAddress)
if err != nil {
return nil, err
}

// 最后将这些配置放到 manager 中
return &controllerManager{
config: config,
scheme: options.Scheme,
cache: cache,
fieldIndexes: cache,
client: writeObj,
apiReader: apiReader,
recorderProvider: recorderProvider,
resourceLock: resourceLock,
mapper: mapper,
metricsListener: metricsListener,
metricsExtraHandlers: metricsExtraHandlers,
logger: options.Logger,
elected: make(chan struct{}),
port: options.Port,
host: options.Host,
certDir: options.CertDir,
leaseDuration: *options.LeaseDuration,
renewDeadline: *options.RenewDeadline,
retryPeriod: *options.RetryPeriod,
healthProbeListener: healthProbeListener,
readinessEndpointName: options.ReadinessEndpointName,
livenessEndpointName: options.LivenessEndpointName,
gracefulShutdownTimeout: *options.GracefulShutdownTimeout,
internalProceduresStop: make(chan struct{}),
}, nil
}

创建 Cache

1
2
3
4
5
6
7
8
func New(config *rest.Config, opts Options) (Cache, error) {
opts, err := defaultOpts(config, opts)
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
return &informerCache{InformersMap: im}, nil
}

这里主要是调用 NewInformersMap方法创建 Informer 的映射

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func NewInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration,
namespace string) *InformersMap {

return &InformersMap{
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace),

Scheme: scheme,
}
}

NewInformersMap会去分别创建,结构化、非结构化以及 metadata 的 InformerMap 而这些方法最后都会去调用 newSpecificInformersMap方法,区别就是不同的方法传入的 createListWatcherFunc 参数不同

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func newSpecificInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration,
namespace string,
createListWatcher createListWatcherFunc) *specificInformersMap {
ip := &specificInformersMap{
config: config,
Scheme: scheme,
mapper: mapper,
informersByGVK: make(map[schema.GroupVersionKind]*MapEntry),
codecs: serializer.NewCodecFactory(scheme),
paramCodec: runtime.NewParameterCodec(scheme),
resync: resync,
startWait: make(chan struct{}),
createListWatcher: createListWatcher,
namespace: namespace,
}
return ip
}

newSpecificInformersMap 和常规的 InformersMap 类似,区别是没实现 WaitForCacheSync方法

以结构化的传入的 createStructuredListWatch 为例,主要是返回一个用于创建 SharedIndexInformer 的 ListWatch 对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
// groupVersionKind to the Resource API we will use.
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, err
}

client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs)
if err != nil {
return nil, err
}
listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
listObj, err := ip.Scheme.New(listGVK)
if err != nil {
return nil, err
}

// TODO: the functions that make use of this ListWatch should be adapted to
// pass in their own contexts instead of relying on this fixed one here.
ctx := context.TODO()
// Create a new ListWatch for the obj
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
res := listObj.DeepCopyObject()
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res)
return res, err
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
// Watch needs to be set to true separately
opts.Watch = true
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
return client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch(ctx)
},
}, nil
}

小结: cache 主要是创建了一些 InformerMap,完成了 GVK 到 Informer 的映射,每个 Informer 会根据 ListWatch 函数对对应的 GVK 进行 List 和 Watch。

创建 Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func New(config *rest.Config, options Options) (Client, error) {
if config == nil {
return nil, fmt.Errorf("must provide non-nil rest.Config to client.New")
}

// Init a scheme if none provided
if options.Scheme == nil {
options.Scheme = scheme.Scheme
}

// Init a Mapper if none provided
if options.Mapper == nil {
var err error
options.Mapper, err = apiutil.NewDynamicRESTMapper(config)
if err != nil {
return nil, err
}
}

clientcache := &clientCache{
config: config,
scheme: options.Scheme,
mapper: options.Mapper,
codecs: serializer.NewCodecFactory(options.Scheme),

structuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta),
unstructuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta),
}

rawMetaClient, err := metadata.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("unable to construct metadata-only client for use as part of client: %w", err)
}

c := &client{
typedClient: typedClient{
cache: clientcache,
paramCodec: runtime.NewParameterCodec(options.Scheme),
},
unstructuredClient: unstructuredClient{
cache: clientcache,
paramCodec: noConversionParamCodec{},
},
metadataClient: metadataClient{
client: rawMetaClient,
restMapper: options.Mapper,
},
scheme: options.Scheme,
mapper: options.Mapper,
}

return c, nil
}

client 创建了两个一个用于读,一个用于写,用于读的会直接使用上面的 cache,用于写的才会直接和 APIServer 进行交互

Controller

下面我们看一下核心的 Controller 是怎么初始化和工作的

1
2
3
4
5
6
7
8
9
if err = (&controllers.NodePoolReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("NodePool"),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("NodePool"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "NodePool")
os.Exit(1)
}

main.go 的方法里面主要是初始化了 Controller 的结构体,然后调用了 SetupWithManager方法

1
2
3
4
5
6
7
// SetupWithManager sets up the controller with the Manager.
func (r *NodePoolReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&nodesv1.NodePool{}).
Watches(&source.Kind{Type: &corev1.Node{}}, handler.Funcs{UpdateFunc: r.nodeUpdateHandler}).
Complete(r)
}

SetupWithManager之前有讲到过,主要是使用了建造者模式,去构建了我们需要监听的对象,只有这些对象的相关事件才会触发我们的 Reconcile 逻辑。这里面的 Complete 最后其实是调用了 Build 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {
// 省略参数校验

// Set the Config
blder.loadRestConfig()

// Set the ControllerManagedBy
if err := blder.doController(r); err != nil {
return nil, err
}

// Set the Watch
if err := blder.doWatch(); err != nil {
return nil, err
}

return blder.ctrl, nil
}

Build主要调用 doControllerdoWatch两个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (blder *Builder) doController(r reconcile.Reconciler) error {
ctrlOptions := blder.ctrlOptions
if ctrlOptions.Reconciler == nil {
ctrlOptions.Reconciler = r
}

// Retrieve the GVK from the object we're reconciling
// to prepopulate logger information, and to optionally generate a default name.
gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme())
if err != nil {
return err
}

// Setup the logger.
if ctrlOptions.Log == nil {
ctrlOptions.Log = blder.mgr.GetLogger()
}
ctrlOptions.Log = ctrlOptions.Log.WithValues("reconciler group", gvk.Group, "reconciler kind", gvk.Kind)

// Build the controller and return.
blder.ctrl, err = newController(blder.getControllerName(gvk), blder.mgr, ctrlOptions)
return err
}

doController主要是初始化了一个 Controller,这里面传入了我们实现 的Reconciler以及获取到我们的 GVK 的名称

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func (blder *Builder) doWatch() error {
// Reconcile type
typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
return err
}

// Watches the managed types
for _, own := range blder.ownsInput {
typeForSrc, err := blder.project(own.object, own.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForOwner{
OwnerType: blder.forInput.object,
IsController: true,
}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, own.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
return err
}
}

// Do the watch requests
for _, w := range blder.watchesInput {
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, w.predicates...)

// If the source of this watch is of type *source.Kind, project it.
if srckind, ok := w.src.(*source.Kind); ok {
typeForSrc, err := blder.project(srckind.Type, w.objectProjection)
if err != nil {
return err
}
srckind.Type = typeForSrc
}

if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {
return err
}
}
return nil
}

Watch 主要是监听我们想要的资源变化,blder.ctrl.Watch(src, hdler, allPredicates...)通过过滤源事件的变化,allPredicates是过滤器,只有所有的过滤器都返回 true 时,才会将事件传递给 EventHandler hdler,这里会将 Handler 注册到 Informer 上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func (cm *controllerManager) Start(ctx context.Context) (err error) {
cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)

// 这个用来表示所有的协程都已经退出了,
stopComplete := make(chan struct{})
defer close(stopComplete)

// ......

// 用于保存错误
cm.errChan = make(chan error)

// 如果需要 metric 就启动 metric 服务
if cm.metricsListener != nil {
go cm.serveMetrics()
}

// 启动健康检查服务
if cm.healthProbeListener != nil {
go cm.serveHealthProbes()
}


go cm.startNonLeaderElectionRunnables()

go func() {
if cm.resourceLock != nil {
err := cm.startLeaderElection()
if err != nil {
cm.errChan <- err
}
} else {
// Treat not having leader election enabled the same as being elected.
close(cm.elected)
go cm.startLeaderElectionRunnables()
}
}()

// 判断是否需要退出
select {
case <-ctx.Done():
// We are done
return nil
case err := <-cm.errChan:
// Error starting or running a runnable
return err
}
}

无论是不是 leader 最后都会使用 startRunnable 启动 Controller

1
2
3
4
5
6
7
8
9
10
11
12
13
func (cm *controllerManager) startNonLeaderElectionRunnables() {
cm.mu.Lock()
defer cm.mu.Unlock()

cm.waitForCache(cm.internalCtx)

// Start the non-leaderelection Runnables after the cache has synced
for _, c := range cm.nonLeaderElectionRunnables {
// Controllers block, but we want to return an error if any have an error starting.
// Write any Start errors to a channel so we can return them
cm.startRunnable(c)
}
}

实际上是调用了 Controller 的 Start方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// Start implements controller.Controller
func (c *Controller) Start(ctx context.Context) error {

// Controller 只能被执行一次
c.mu.Lock()
if c.Started {
return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
}

// Set the internal context.
c.ctx = ctx

// 获取队列
c.Queue = c.MakeQueue()
defer c.Queue.ShutDown()

err := func() error {
defer c.mu.Unlock()

defer utilruntime.HandleCrash()

// 尝试等待缓存
for _, watch := range c.startWatches {
c.Log.Info("Starting EventSource", "source", watch.src)
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
return err
}
}

// 启动 Controller
c.Log.Info("Starting Controller")


for _, watch := range c.startWatches {
syncingSource, ok := watch.src.(source.SyncingSource)
if !ok {
continue
}
if err := syncingSource.WaitForSync(ctx); err != nil {
// This code is unreachable in case of kube watches since WaitForCacheSync will never return an error
// Leaving it here because that could happen in the future
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
c.Log.Error(err, "Could not wait for Cache to sync")
return err
}
}

// All the watches have been started, we can reset the local slice.
//
// We should never hold watches more than necessary, each watch source can hold a backing cache,
// which won't be garbage collected if we hold a reference to it.
c.startWatches = nil

if c.JitterPeriod == 0 {
c.JitterPeriod = 1 * time.Second
}

// Launch workers to process resources
c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
ctrlmetrics.WorkerCount.WithLabelValues(c.Name).
Set(float64(c.MaxConcurrentReconciles))
for i := 0; i < c.MaxConcurrentReconciles; i++ {
go wait.UntilWithContext(ctx, func(ctx context.Context) {
// 查询队列中有没有关注的事件,有的话就触发我们的 reconcile 逻辑
for c.processNextWorkItem(ctx) {
}
}, c.JitterPeriod)
}

c.Started = true
return nil
}()
if err != nil {
return err
}

<-ctx.Done()
c.Log.Info("Stopping workers")
return nil
}

// attempt to process it, by calling the reconcileHandler.
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := c.Queue.Get()
if shutdown {
// Stop working
return false
}

// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.Queue.Done(obj)

ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)

c.reconcileHandler(ctx, obj)
return true
}

Reconcile 方法的触发是通过 Cache 中的 Informer 获取到资源的变更事件,然后再通过生产者消费者的模式触发我们自己实现的 Reconcile 方法的。

Kubebuilder 是一个非常好用的 Operator 开发框架,不仅极大的简化了 Operator 的开发过程,并且充分的利用了 go interface 的特性留下了足够的扩展性,这个我们可以学习,如果我们的业务代码开发框架能够做到这个地步,我觉得也就不错了

关注我获取更新


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK