导读
此篇代码分析是kubesphere-controller-manager模块基础之一。kubesphere-controller使用的库为k8s/client-go和controller-runtime。example-controller是client-go官方示例代码。
Example-controller
代码功能:实时监控资源类型为foo的资源。
foo类型资源功能:创建一个delopment资源,该资源镜像为nginx
example-controller源码解析
代码结构图
example-controller使用go mod。
├── CONTRIBUTING.md
├── Godeps # 依赖的文件的信息
├── LICENSE
├── OWNERS
├── README.md
├── SECURITY_CONTACTS
├── artifacts # yaml资源构建脚本
│ └── examples
│ ├── crd-status-subresource.yaml
│ ├── crd-validation.yaml
│ ├── crd.yaml # crd资源,申请注册Foo资源类型
│ └── example-foo.yaml
├── code-of-conduct.md
├── controller.go # 监控foo资源,对foo资源处理。被main调用
├── controller_test.go # controller的测试
├── docs
│ ├── controller-client-go.md
│ └── images
│ └── client-go-controller-interaction.jpeg
├── go.mod # 本项目使用go mod
├── go.sum
├── hack # 代码自动生成脚本
│ ├── boilerplate.go.txt
│ ├── custom-boilerplate.go.txt
│ ├── tools.go
│ ├── update-codegen.sh
│ └── verify-codegen.sh
├── main.go # example-controller的入口
└── pkg # example-controller的依赖
├── apis # foo资源的数据类型
│ └── samplecontroller
│ ├── register.go
│ └── v1alpha1
│ ├── doc.go
│ ├── register.go
│ ├── types.go
│ └── zz_generated.deepcopy.go # 由代码自动生成
├── generated # 这个文件下面代码自动生成
│ ├── clientset
│ ├── informers
│ └── listers
└── signals # 信号,关闭程序的信号
├── signal.go
├── signal_posix.go
└── signal_windows.go
代码解析
调用链简约版本

调用链复杂版本

informer和clientset都是对api-server的访问。设置好informer后,informer将监控到的事件放入队列(在informer上会设置回调函数,回调函数将数据放入队列)。另起一个线程不断的访问队列,有数据就拿出来给syncHandler处理
注:隐藏了cache部分,cache:informer将api server上资源信息缓存到本地。client-go的informer是比较重要的部分
Main 代码
// masterURL:The address of the Kubernetes API server.优先级比kubeconfig高
// kubeconfig:Path to a kubeconfig. 默认为~/.kube/config
// cfg 为rest config
cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
// 根据配置生成k8s的clientset和example-controller的clientset。
kubeClient, err := kubernetes.NewForConfig(cfg)
exampleClient, err := clientset.NewForConfig(cfg)
// 使用clientset构造Informer。使用Informer替代clientset对k8s资源的访问。
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)
// 初始化controller结构体。创建&&初始化:example和Deployments Informer。
// 在controller代码将会具体解析NewController
controller := NewController(kubeClient, exampleClient,
kubeInformerFactory.Apps().V1().Deployments(),
exampleInformerFactory.Samplecontroller().V1alpha1().Foos())
// 运行 kubeInformerFactory,exampleInformerFactory 中已注册的所有 Infomer
kubeInformerFactory.Start(stopCh)
exampleInformerFactory.Start(stopCh)
// 运行Run。
// 1.不断的从workqueue获取foo类型资源
// 2.调用syncHandle处理foo类型资源
controller.Run(2, stopCh)
为什么使用informer替代部分clientset对api-server访问。
我们的controller监控foo类型。所以controller需要一直访问k8s集群。这就导致对k8s集群访问次数增加。为了解决这个问题。informer使用了LisWatch策略。informer使用长链接来监控资源。一旦资源变化就将资源缓存下来。
controller代码
controller 结构体
type Controller struct {
// kubernetes的客户端(clientset)
kubeclientset kubernetes.Interface
// sampleclientset 我们自定义资源的客户端(clientset)
sampleclientset clientset.Interface
// Lister和Synced属于informer
deploymentsLister appslisters.DeploymentLister
deploymentsSynced cache.InformerSynced
foosLister listers.FooLister
foosSynced cache.InformerSynced
// 工作队列
workqueue workqueue.RateLimitingInterface
//记录器是一个事件记录器,用于将事件资源记录到Kubernetes API。
recorder record.EventRecorder
}
NewController方法
// 设置fooInformer和deploymentInformer事件触发器,
// enqueueFoo将对象放入到workqueue队列中
fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueFoo,
UpdateFunc: func(old, new interface{}) {
controller.enqueueFoo(new)
},
})
deploymentInformer.Informer().AddEventHandler(....)
syncHandle:根据workqueue队列中key,处理事件。
//相关调用链:Run-->runWorker--->processNextWorkItem--->syncHandler(workqueue.Get())
func (c *Controller) syncHandler(key string) error {
// 从cache中获取namespace, name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
// 从foosLister获取foo
foo, err := c.foosLister.Foos(namespace).Get(name)
deploymentName := foo.Spec.DeploymentName
// 从Foo.spec获得deployment名称
deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
// 如果没有该资源,就创建
if errors.IsNotFound(err) {
deployment, err = c.kubeclientset
.AppsV1()
.Deployments(foo.Namespace)
.Create(newDeployment(foo))
}
...
// 下面两个if是判断Deployment是否符合要求。
// 1.deployment是否被foo掌控
// 2.deployment的Replicas是否符合要求
if !metav1.IsControlledBy(deployment, foo) {
msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg)
return fmt.Errorf(msg)
}
if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
klog.V(4).Infof("Foo %s replicas: %d, deployment replicas: %d", name, *foo.Spec.Replicas, *deployment.Spec.Replicas)
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(newDeployment(foo))
}
...
// 更新foo状态
err = c.updateFooStatus(foo, deployment)
if err != nil {
return err
}
// 记录
c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
return nil
}