透视 kubefed 源码 —— ClusterController kubefed 其实是用 kubebuilder v1 脚手架做的,所以源代码看起来不是那么困难,首先来看看kubefed的集群管理是怎么做到的。
1.kubefed 中的集群资源 先来看看 kubefed 里定义的集群资源长什么样子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 apiVersion: core.kubefed.io/v1beta1 kind: KubeFedCluster metadata: creationTimestamp: "2020-05-31T09:31:20Z" generation: 1 name: center namespace: kube-federation-system resourceVersion: "23933" selfLink: /apis/core.kubefed.io/v1beta1/namespaces/kube-federation-system/kubefedclusters/center uid: cc9d5756-c65e-4f8d-bbfd-96ca081961d4 spec: apiEndpoint: https://10.10.10.10:8443 caBundle: xxaa secretRef: name: center-q4k9n status: conditions: - lastProbeTime: "2020-05-31T15:34:23Z" lastTransitionTime: "2020-05-31T15:33:52Z" message: /healthz responded with ok reason: ClusterReady status: "True" type: Ready region: ""
这个API其实被定义在 /pkg/apis/core/kubefedcluster_types.go
里:
1 2 3 4 5 6 7 8 9 10 11 12 13 type KubeFedClusterSpec struct { APIEndpoint string `json:"apiEndpoint"` CABundle []byte `json:"caBundle,omitempty"` SecretRef LocalSecretReference `json:"secretRef"` DisabledTLSValidations []TLSValidation `json:"disabledTLSValidations,omitempty"` } type KubeFedClusterStatus struct { Conditions []ClusterCondition `json:"conditions"` Zones []string `json:"zones,omitempty"` Region *string `json:"region,omitempty"` }
其中,ClusterCondition
的定义内容如下:
1 2 3 4 5 6 7 8 9 10 11 type ClusterCondition struct { Type common.ClusterConditionType `json:"type"` Status apiv1.ConditionStatus `json:"status"` LastProbeTime metav1.Time `json:"lastProbeTime"` LastTransitionTime *metav1.Time `json:"lastTransitionTime,omitempty"` Reason *string `json:"reason,omitempty"` Message *string `json:"message,omitempty"` }
2. kubefed 中的集群资源 Client kubefed 中的集群控制器是名称是 kubefedcluster
,源代码位于 /pkg/controller/kubefedcluser
。
这里为了用于监听不同集群的状态,kubefed 专门定义了一套ClusterClient用于 list/watch 多个集群的状态信息,位于/pkg/controller/kubefedcluser/clusterclient.go
:
1 2 3 4 5 type ClusterClient struct { kubeClient *kubeclientset.Clientset clusterName string }
其实就是在 go client 的基础上封装了一个集群名称,所以初始化的时候可能需要多做一些处理,通过从上面定义的 kubefedcluster
CR 获得 相关信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func NewClusterClientSet (c *fedv1b1.KubeFedCluster, client generic.Client, fedNamespace string , timeout time.Duration) (*ClusterClient, error) { clusterConfig, err := util.BuildClusterConfig(c, client, fedNamespace) if err != nil { return nil , err } clusterConfig.Timeout = timeout var clusterClientSet = ClusterClient{clusterName: c.Name} if clusterConfig != nil { clusterClientSet.kubeClient = kubeclientset.NewForConfigOrDie((restclient.AddUserAgent(clusterConfig, UserAgentName))) if clusterClientSet.kubeClient == nil { return nil , nil } } return &clusterClientSet, nil }
kubefed 也给 clusterclient 定义了获取集群状态的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 func (self *ClusterClient) GetClusterHealthStatus () (*fedv1b1.KubeFedClusterStatus, error) { body, err := self.kubeClient.DiscoveryClient.RESTClient().Get().AbsPath("/healthz" ).Do().Raw() if err != nil { runtime.HandleError(errors.Wrapf(err, "Failed to do cluster health check for cluster %q" , self.clusterName)) clusterStatus.Conditions = append (clusterStatus.Conditions, newClusterOfflineCondition) metrics.RegisterKubefedClusterTotal(metrics.ClusterOffline, self.clusterName) } else { if !strings.EqualFold(string (body), "ok" ) { metrics.RegisterKubefedClusterTotal(metrics.ClusterNotReady, self.clusterName) clusterStatus.Conditions = append (clusterStatus.Conditions, newClusterNotReadyCondition, newClusterNotOfflineCondition) } else { metrics.RegisterKubefedClusterTotal(metrics.ClusterReady, self.clusterName) clusterStatus.Conditions = append (clusterStatus.Conditions, newClusterReadyCondition) } } return &clusterStatus, err }
3. kubefed 中的集群资源控制器 在 /pkg/controller/kubefedcluser/controller.go
里,我们找到的真正的控制器代码,kubefed首先定义了一个集群数据结构体,用于存放集群状态、健康监测探针等变量:
1 2 3 4 5 6 7 8 9 10 11 12 13 type ClusterData struct { clusterKubeClient *ClusterClient clusterStatus *fedv1b1.KubeFedClusterStatus resultRun int64 cachedObj *fedv1b1.KubeFedCluster }
我们接下来再看看 控制器结构体有什么不一样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 type ClusterController struct { client genericclient.Client clusterHealthCheckConfig *util.ClusterHealthCheckConfig mu sync.RWMutex clusterDataMap map [string ]*ClusterData clusterController cache.Controller fedNamespace string eventRecorder record.EventRecorder }
这里的 util.ClusterHealthCheckConfig
长这个样子:
1 2 3 4 5 6 type ClusterHealthCheckConfig struct { Period time.Duration FailureThreshold int64 SuccessThreshold int64 Timeout time.Duration }
这里,该结构体用于设置控制器的检测周期,连接数和超时时间。
接下来我们看看创建控制器的函数,这个函数很“原生控制器“:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 func newClusterController (config *util.ControllerConfig, clusterHealthCheckConfig *util.ClusterHealthCheckConfig) (*ClusterController, error) { kubeConfig := restclient.CopyConfig(config.KubeConfig) cc := &ClusterController{ client: client, clusterHealthCheckConfig: clusterHealthCheckConfig, clusterDataMap: make (map [string ]*ClusterData), fedNamespace: config.KubeFedNamespace, } kubeClient := kubeclient.NewForConfigOrDie(kubeConfig) var err error _, cc.clusterController, err = util.NewGenericInformerWithEventHandler( config.KubeConfig, config.KubeFedNamespace, &fedv1b1.KubeFedCluster{}, util.NoResyncPeriod, &cache.ResourceEventHandlerFuncs{ DeleteFunc: func (obj interface {}) { castObj, ok := obj.(*fedv1b1.KubeFedCluster) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { klog.Errorf("Couldn't get object from tombstone %#v" , obj) return } castObj, ok = tombstone.Obj.(*fedv1b1.KubeFedCluster) if !ok { klog.Errorf("Tombstone contained object that is not expected %#v" , obj) return } } cc.delFromClusterSet(castObj) }, AddFunc: func (obj interface {}) { castObj := obj.(*fedv1b1.KubeFedCluster) cc.addToClusterSet(castObj) }, UpdateFunc: func (oldObj, newObj interface {}) { var clusterChanged bool cluster := newObj.(*fedv1b1.KubeFedCluster) cc.mu.Lock() clusterData, ok := cc.clusterDataMap[cluster.Name] if !ok || !equality.Semantic.DeepEqual(clusterData.cachedObj.Spec, cluster.Spec) || !equality.Semantic.DeepEqual(clusterData.cachedObj.ObjectMeta.Annotations, cluster.ObjectMeta.Annotations) || !equality.Semantic.DeepEqual(clusterData.cachedObj.ObjectMeta.Labels, cluster.ObjectMeta.Labels) { clusterChanged = true } cc.mu.Unlock() if !clusterChanged { return } cc.delFromClusterSet(cluster) cc.addToClusterSet(cluster) }, }, ) return cc, err }
这里面出现的 ClusterSet 增删函数其实简单,就是对控制器里的ClusterData map的增删而已:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 func (cc *ClusterController) delFromClusterSet (obj *fedv1b1.KubeFedCluster) { cc.mu.Lock() defer cc.mu.Unlock() klog.V(1 ).Infof("ClusterController observed a cluster deletion: %v" , obj.Name) delete (cc.clusterDataMap, obj.Name) } func (cc *ClusterController) addToClusterSet (obj *fedv1b1.KubeFedCluster) { cc.mu.Lock() defer cc.mu.Unlock() clusterData := cc.clusterDataMap[obj.Name] if clusterData != nil && clusterData.clusterKubeClient != nil { return } klog.V(1 ).Infof("ClusterController observed a new cluster: %v" , obj.Name) restClient, err := NewClusterClientSet(obj, cc.client, cc.fedNamespace, cc.clusterHealthCheckConfig.Timeout) if err != nil || restClient == nil { cc.RecordError(obj, "MalformedClusterConfig" , errors.Wrap(err, "The configuration for this cluster may be malformed" )) return } cc.clusterDataMap[obj.Name] = &ClusterData{clusterKubeClient: restClient, cachedObj: obj.DeepCopy()} }
这里也可以看出来,控制器里加的那个锁的目的其实就是用来保证ClusterData的并发安全(为啥不直接用并发安全字典)。
我们最后来看下 控制器的 run 方法,这个方法其实是被 StartClusterController 调用,StartClusterController 其实也只是对 run 方法的简单封装,这里就不详细看了,我们之间看 run 方法的实现:
1 2 3 4 5 6 7 8 9 10 11 func (cc *ClusterController) Run (stopChan <-chan struct {}) { defer utilruntime.HandleCrash() go cc.clusterController.Run(stopChan) go wait.Until(func () { if err := cc.updateClusterStatus(); err != nil { klog.Errorf("Error monitoring cluster status: %v" , err) } }, cc.clusterHealthCheckConfig.Period, stopChan) }
这里我们在看看这里涉及的更新集群状态函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 func (cc *ClusterController) updateClusterStatus () error { clusters := &fedv1b1.KubeFedClusterList{} err := cc.client.List(context.TODO(), clusters, cc.fedNamespace) if err != nil { return err } var wg sync.WaitGroup for _, obj := range clusters.Items { cc.mu.RLock() cluster := obj.DeepCopy() clusterData := cc.clusterDataMap[cluster.Name] cc.mu.RUnlock() if clusterData == nil { cc.addToClusterSet(cluster) cc.mu.RLock() clusterData = cc.clusterDataMap[cluster.Name] cc.mu.RUnlock() if clusterData == nil { klog.Warningf("Failed to retrieve stored data for cluster %s" , cluster.Name) continue } } wg.Add(1 ) go cc.updateIndividualClusterStatus(cluster, clusterData, &wg) } wg.Wait() return nil } func (cc *ClusterController) updateIndividualClusterStatus (cluster *fedv1b1.KubeFedCluster, storedData *ClusterData, wg *sync.WaitGroup) { defer metrics.ClusterHealthStatusDurationFromStart(time.Now()) clusterClient := storedData.clusterKubeClient currentClusterStatus, err := clusterClient.GetClusterHealthStatus() if err != nil { cc.RecordError(cluster, "RetrievingClusterHealthFailed" , errors.Wrap(err, "Failed to retrieve health of the cluster" )) } currentClusterStatus = thresholdAdjustedClusterStatus(currentClusterStatus, storedData, cc.clusterHealthCheckConfig) if utilfeature.DefaultFeatureGate.Enabled(features.CrossClusterServiceDiscovery) { currentClusterStatus = cc.updateClusterZonesAndRegion(currentClusterStatus, cluster, clusterClient) } storedData.clusterStatus = currentClusterStatus cluster.Status = *currentClusterStatus if err := cc.client.UpdateStatus(context.TODO(), cluster); err != nil { klog.Warningf("Failed to update the status of cluster %q: %v" , cluster.Name, err) } wg.Done() }
至此,kubefed 集群控制器的基本工作流也就梳理的差不多了,其实外部想启用 kubefed 集群控制器也只需调用下 StartClusterController 函数就可以让集群控制器工作了。 其实,kubefed 集群控制器就是监控 KubeFedCluster 资源对象的变更,通过 KubeFedCluster 里的具体字段,尝试与那个集群的 API server 通信,并将这些数据作为 ClusterData 存起来,用于其他的控制器监听联邦资源变化的时候使用。一句话,管理联邦集群里所有的集群的通信 Client,后面有用(留给其他的控制器用)。