1 基本框架
执行 kubebuilder init
命令后,KubeBuilder 就会创建 Controller 的基本框架。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
}
func main() {
// 命令行参数绑定
var metricsAddr string
var enableLeaderElection bool
var probeAddr string
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
opts := zap.Options{
Development: true,
}
opts.BindFlags(flag.CommandLine)
flag.Parse()
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
// 创建一个 Manager
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "67295132.shiori.cn",
})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
//+kubebuilder:scaffold:builder
// 添加健康检查
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up ready check")
os.Exit(1)
}
// 启动
setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
}
|
1.1 Scheme
Scheme 表示 CRD 的结构,controller-runtime
包的 Client 就是基于 Scheme 来解析消息的。
因此,我们需要将所有 Client 将用到的资源的 Scheme 注册到生成的 scheme 对象中。
注册方式很简单,当我们使用 KubeBuilder 或者原始 Kubernetes 方式生成 API Client 后,都会生成对应的 AddToScheme()
函数。
1
2
3
4
5
6
7
8
9
10
11
|
var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme)) // Kubernete 资源
utilruntime.Must(v1beta1.AddToScheme(scheme)) // 自定义资源
//+kubebuilder:scaffold:scheme
}
|
1.2 Manager
Manager 包含了各个 Controller 的通用依赖,通过 Manager 来给各个 Controller 共享 Cache 或者 Client。
初始化后,会创建一个 Manager,后续实现自己的 Controller 后,就会注册到 Manager 中。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// ...
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "67295132.shiori.cn",
})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
|
当所有的 Controller 注册到 Manager 中后,main
函数结尾会启动 Manager,也就是启动了所有的 Controller。
1
2
3
4
5
|
setupLog.Info("starting manager")
if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
|
2 核心对象
2.1 Reconciler
Reconciler
代表着我们自己实现的处理资源的逻辑,其 interface 定义如下:
1
2
3
4
5
6
|
type Reconciler interface {
// Reconciler performs a full reconciliation for the object referred to by the Request.
// The Controller will requeue the Request to be processed again if an error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
Reconcile(context.Context, Request) (Result, error)
}
|
将 Reconciler 注册到 Controller 后,当监听的资源发生变化就会调用 Reconcile()
函数。
在我们使用 KubeBuilder 创建 API 资源后,会自动创建对应的 Reconciler
对象:
1
2
3
4
|
type ClusterReconciler struct {
client.Client
Scheme *runtime.Scheme
}
|
Client
提供了 Kubernetes Client,可用于 Get/List/Update/Patch 资源,也包括了 Status 更新接口。
无论是资源的增删改,都会触发 Reconcile()
函数,并通过 Result 来决定是否需要 requeue。 参数 Request 参数只提供了资源的 Namesapce/Name,我们需要自行通过 Client
来获取对应的资源对象。
1
2
3
4
|
func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
var cluster v1.Cluster
err := r.Get(ctx, req.NamespacedName, &cluster)
}
|
2.2 Controller
当我们编写了一个 Reconciler 后,需要注入到 Controller
中,Controller 用于定义如何监听资源。Controller 最后注册到 Manager 中,由此来共享 Manager 的 Cache / Client 等资源。
Controller 的注册函数基本如下:
1
2
3
4
5
6
7
|
import ctrl "sigs.k8s.io/controller-runtime"
ctrl.NewControllerManagedBy(mgr).
For(&v1.Cluster{}, builder.WithPredicates(pdt)).
WithOptions(controller.Options{RateLimiter: common.DefaultRateLimiter()}).
WithEventFilter(p).
Complete(r)
|
- 调用
NewControllerManagedBy
基于 Manager 创建 Controller Builder。
- 围绕如何触发 Reconcile 来配置 Builder。
- 调用 Complete/Build 来完成注册。
Builder 支持的方法包括:
-
资源监听
-
For - 表明监听资源的 create / delete / update 事件。
-
Owns - 表明监听该资源的子资源,当子资源 create / delete / update 时,会触发其 Own 资源的 Reconcile。
-
Watches - 底层的 Watch 函数,自定义监听的事件以及过滤条件。
-
配置
-
完成 Build
2.3 Manager
Manager 用于包含各个 Controller 共享的对象,例如 Cache / Client / Scheme 等对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
// Cluster provides various methods to interact with a cluster.
type Cluster interface {
// SetFields will set any dependencies on an object for which the object has implemented the inject
// interface - e.g. inject.Client.
// Deprecated: use the equivalent Options field to set a field. This method will be removed in v0.10.
SetFields(interface{}) error
// GetConfig returns an initialized Config
GetConfig() *rest.Config
// GetScheme returns an initialized Scheme
GetScheme() *runtime.Scheme
// GetClient returns a client configured with the Config. This client may
// not be a fully "direct" client -- it may read from a cache, for
// instance. See Options.NewClient for more information on how the default
// implementation works.
GetClient() client.Client
// GetFieldIndexer returns a client.FieldIndexer configured with the client
GetFieldIndexer() client.FieldIndexer
// GetCache returns a cache.Cache
GetCache() cache.Cache
// GetEventRecorderFor returns a new EventRecorder for the provided name
GetEventRecorderFor(name string) record.EventRecorder
// GetRESTMapper returns a RESTMapper
GetRESTMapper() meta.RESTMapper
// GetAPIReader returns a reader that will be configured to use the API server.
// This should be used sparingly and only when the client does not fit your
// use case.
GetAPIReader() client.Reader
// Start starts the cluster
Start(ctx context.Context) error
}
// Manager initializes shared dependencies such as Caches and Clients, and provides them to Runnables.
// A Manager is required to create Controllers.
type Manager interface {
// Cluster holds a variety of methods to interact with a cluster.
cluster.Cluster
// Add will set requested dependencies on the component, and cause the component to be
// started when Start is called. Add will inject any dependencies for which the argument
// implements the inject interface - e.g. inject.Client.
// Depending on if a Runnable implements LeaderElectionRunnable interface, a Runnable can be run in either
// non-leaderelection mode (always running) or leader election mode (managed by leader election if enabled).
Add(Runnable) error
// Elected is closed when this manager is elected leader of a group of
// managers, either because it won a leader election or because no leader
// election was configured.
Elected() <-chan struct{}
// AddMetricsExtraHandler adds an extra handler served on path to the http server that serves metrics.
// Might be useful to register some diagnostic endpoints e.g. pprof. Note that these endpoints meant to be
// sensitive and shouldn't be exposed publicly.
// If the simple path -> handler mapping offered here is not enough, a new http server/listener should be added as
// Runnable to the manager via Add method.
AddMetricsExtraHandler(path string, handler http.Handler) error
// AddHealthzCheck allows you to add Healthz checker
AddHealthzCheck(name string, check healthz.Checker) error
// AddReadyzCheck allows you to add Readyz checker
AddReadyzCheck(name string, check healthz.Checker) error
// Start starts all registered Controllers and blocks until the context is cancelled.
// Returns an error if there is an error starting any controller.
//
// If LeaderElection is used, the binary must be exited immediately after this returns,
// otherwise components that need leader election might continue to run after the leader
// lock was lost.
Start(ctx context.Context) error
// GetWebhookServer returns a webhook.Server
GetWebhookServer() *webhook.Server
// GetLogger returns this manager's logger.
GetLogger() logr.Logger
// GetControllerOptions returns controller global configuration options.
GetControllerOptions() v1alpha1.ControllerConfigurationSpec
}
|
因此,各个 Reconciler 中的 Client 或者 Cache 都可以从 Manager 中获取。
创建 Manager 时必须传入 kubeconfig,并提供一些参数控制行为。
1
2
3
4
5
6
7
8
|
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "2100d6c4.infra.pingcap.com",
})
|
将各个 Controller 注册到 Manager 后,由 Manager 来统一启动与停止。当 Manager 启动后,就会进行 Cache 的第一次填充,并且启动资源的监听。
1
2
3
4
|
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
|
具体的启动操作包括:
- 启动 Metric 与 Health 接口。
- 启动 Webhook。
- 填充 Cache。
- 获取 leader election,成功后启动各个 Controller 的监听。
3 Webhook
使用 KubeBuilder 可以生成为自定义类型创建 Webhook,通过 kubebuilder create webhook
命令。
1
|
kubebuilder create webhook --group batch --version v1 --kind CronJob --conversion
|
KubeBuilder 会自动为自定义资源创建 Webhook 相关的函数,并且在 main
函数中注册 Webhook。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func (r *CronJob) SetupWebhookWithManager(mgr ctrl.Manager) error {
return ctrl.NewWebhookManagedBy(mgr).
For(r).
Complete()
}
func main() {
if err = (&batchv1.CronJob{}).SetupWebhookWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create webhook", "webhook", "CronJob")
os.Exit(1)
}
// ...
}
|
3.1 Mutate
KubeBuilder 自动创建的 Default()
函数用于在 CR 写入 ETCD 前为 CR 设定一些默认值,在 Mutating Webhook 阶段被执行。
1
2
3
4
5
6
|
func (r *CronJob) Default() {
if r.Spec.ConcurrencyPolicy == "" {
r.Spec.ConcurrencyPolicy = AllowConcurrent
}
// ...
}
|
3.2 Validate
KubeBuilder 也会自动创建 Validate 相关的函数,可以实现对 Create / Update / Delete 操作的。在 Validate Webhook 阶段被执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
// ValidateCreate 实现 webhook.Validator 使这类型的 webhook 被成功注册
func (r *CronJob) ValidateCreate() error {
return r.validateCronJob()
}
// ValidateUpdate 实现 webhook.Validator 使这类型的 webhook 被成功注册
func (r *CronJob) ValidateUpdate(old runtime.Object) error {
return r.validateCronJob()
}
// ValidateDelete 实现 webhook.Validator 使这类型的 webhook 被成功注册
func (r *CronJob) ValidateDelete() error {
return nil
}
|
Note
ValidateDelete 是在调用 APIServer Delete 接口时触发的,因此是 CR 的 delete stamp 设置之前触发。
4 controller-gen
参考