Funky's NoteBook

Kubefed - ClusterController

字数统计: 2,358阅读时长: 12 min
2020/06/01 Share

透视 kubefed 源码 —— ClusterController

kubefed 其实是用 kubebuilder v1 脚手架做的,所以源代码看起来不是那么困难,首先来看看kubefed的集群管理是怎么做到的。

1.kubefed 中的集群资源

先来看看 kubefed 里定义的集群资源长什么样子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
apiVersion: core.kubefed.io/v1beta1
kind: KubeFedCluster
metadata:
creationTimestamp: "2020-05-31T09:31:20Z"
generation: 1
name: center
namespace: kube-federation-system
resourceVersion: "23933"
selfLink: /apis/core.kubefed.io/v1beta1/namespaces/kube-federation-system/kubefedclusters/center
uid: cc9d5756-c65e-4f8d-bbfd-96ca081961d4
spec:
apiEndpoint: https://10.10.10.10:8443
caBundle: xxaa
secretRef:
name: center-q4k9n
status:
conditions:
- lastProbeTime: "2020-05-31T15:34:23Z"
lastTransitionTime: "2020-05-31T15:33:52Z"
message: /healthz responded with ok
reason: ClusterReady
status: "True"
type: Ready
region: ""

这个API其实被定义在 /pkg/apis/core/kubefedcluster_types.go 里:

1
2
3
4
5
6
7
8
9
10
11
12
13
// KubeFedClusterSpec defines the desired state of KubeFedCluster
type KubeFedClusterSpec struct {
APIEndpoint string `json:"apiEndpoint"`
CABundle []byte `json:"caBundle,omitempty"`
SecretRef LocalSecretReference `json:"secretRef"`
DisabledTLSValidations []TLSValidation `json:"disabledTLSValidations,omitempty"`
}

type KubeFedClusterStatus struct {
Conditions []ClusterCondition `json:"conditions"`
Zones []string `json:"zones,omitempty"`
Region *string `json:"region,omitempty"`
}

其中,ClusterCondition 的定义内容如下:

1
2
3
4
5
6
7
8
9
10
11
type ClusterCondition struct {
// Type of cluster condition, Ready or Offline.
Type common.ClusterConditionType `json:"type"`
// Status of the condition, one of True, False, Unknown.
Status apiv1.ConditionStatus `json:"status"`
// Last time the condition was checked.
LastProbeTime metav1.Time `json:"lastProbeTime"`
LastTransitionTime *metav1.Time `json:"lastTransitionTime,omitempty"`
Reason *string `json:"reason,omitempty"`
Message *string `json:"message,omitempty"`
}

2. kubefed 中的集群资源 Client

kubefed 中的集群控制器是名称是 kubefedcluster ,源代码位于 /pkg/controller/kubefedcluser

这里为了用于监听不同集群的状态,kubefed 专门定义了一套ClusterClient用于 list/watch 多个集群的状态信息,位于/pkg/controller/kubefedcluser/clusterclient.go

1
2
3
4
5
// particular KubeFedCluster.
type ClusterClient struct {
kubeClient *kubeclientset.Clientset
clusterName string
}

其实就是在 go client 的基础上封装了一个集群名称,所以初始化的时候可能需要多做一些处理,通过从上面定义的 kubefedcluster CR 获得 相关信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func NewClusterClientSet(c *fedv1b1.KubeFedCluster, client generic.Client, fedNamespace string, timeout time.Duration) (*ClusterClient, error) {
// 这里的 BuildClusterConfig 主要用于从CR中提取相关的配置参数,并生成
// 一个 rest client
// BuildClusterConfig 详细代码位于:
// pkg/controller/util/cluster_util.go
clusterConfig, err := util.BuildClusterConfig(c, client, fedNamespace)
if err != nil {
return nil, err
}
clusterConfig.Timeout = timeout
var clusterClientSet = ClusterClient{clusterName: c.Name}
if clusterConfig != nil {
clusterClientSet.kubeClient = kubeclientset.NewForConfigOrDie((restclient.AddUserAgent(clusterConfig, UserAgentName)))
if clusterClientSet.kubeClient == nil {
return nil, nil
}
}
return &clusterClientSet, nil
}

kubefed 也给 clusterclient 定义了获取集群状态的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (self *ClusterClient) GetClusterHealthStatus() (*fedv1b1.KubeFedClusterStatus, error) {
//... 上面一圈状态定义,TL,NR
// 通过 REST client 获取集群健康状态
body, err := self.kubeClient.DiscoveryClient.RESTClient().Get().AbsPath("/healthz").Do().Raw()
if err != nil {
// 如果可以通信,就把集群的 condition 字段增加数据
runtime.HandleError(errors.Wrapf(err, "Failed to do cluster health check for cluster %q", self.clusterName))
clusterStatus.Conditions = append(clusterStatus.Conditions, newClusterOfflineCondition)
metrics.RegisterKubefedClusterTotal(metrics.ClusterOffline, self.clusterName)
} else {
if !strings.EqualFold(string(body), "ok") {
// 可以通信就把集群的 condition 字段增加数据
metrics.RegisterKubefedClusterTotal(metrics.ClusterNotReady, self.clusterName)
clusterStatus.Conditions = append(clusterStatus.Conditions, newClusterNotReadyCondition, newClusterNotOfflineCondition)
} else {
metrics.RegisterKubefedClusterTotal(metrics.ClusterReady, self.clusterName)
clusterStatus.Conditions = append(clusterStatus.Conditions, newClusterReadyCondition)
}
}

return &clusterStatus, err
}

3. kubefed 中的集群资源控制器

/pkg/controller/kubefedcluser/controller.go 里,我们找到的真正的控制器代码,kubefed首先定义了一个集群数据结构体,用于存放集群状态、健康监测探针等变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
type ClusterData struct {
// clusterKubeClient is the kube client for the cluster.
clusterKubeClient *ClusterClient

// clusterStatus is the cluster status as of last sampling.
clusterStatus *fedv1b1.KubeFedClusterStatus

// How many times in a row the probe has returned the same result.
resultRun int64

// cachedObj holds the last observer object from apiserver
cachedObj *fedv1b1.KubeFedCluster
}

我们接下来再看看 控制器结构体有什么不一样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type ClusterController struct {
client genericclient.Client

// 增加了一个健康监测配置
clusterHealthCheckConfig *util.ClusterHealthCheckConfig

mu sync.RWMutex
// 增加了一个hashmap 用于存储多个节点的集群状态信息
clusterDataMap map[string]*ClusterData

// 定义了一个集群控制器(基于cache controller) 用于注册事件回调
clusterController cache.Controller

//用于记录kubefed 在中心集群的那个 ns 里
// fedNamespace is the name of the namespace containing
// KubeFedCluster resources and their associated secrets.
fedNamespace string

eventRecorder record.EventRecorder
}

这里的 util.ClusterHealthCheckConfig 长这个样子:

1
2
3
4
5
6
type ClusterHealthCheckConfig struct {
Period time.Duration
FailureThreshold int64
SuccessThreshold int64
Timeout time.Duration
}

这里,该结构体用于设置控制器的检测周期,连接数和超时时间。

接下来我们看看创建控制器的函数,这个函数很“原生控制器“:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
func newClusterController(config *util.ControllerConfig, clusterHealthCheckConfig *util.ClusterHealthCheckConfig) (*ClusterController, error) {
// 省略定义, TL,NR
kubeConfig := restclient.CopyConfig(config.KubeConfig)
cc := &ClusterController{
client: client,
clusterHealthCheckConfig: clusterHealthCheckConfig,
clusterDataMap: make(map[string]*ClusterData),
fedNamespace: config.KubeFedNamespace,
}

//...
kubeClient := kubeclient.NewForConfigOrDie(kubeConfig)

var err error
// 创建一个Informer 用于监听 KubeFedCluster 的变化
_, cc.clusterController, err = util.NewGenericInformerWithEventHandler(
config.KubeConfig,
config.KubeFedNamespace,
&fedv1b1.KubeFedCluster{},
util.NoResyncPeriod,
// 定义了监听资源变化的处理方法
&cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
// go 语言断言处理,直接转换为 KubeFedCluster 资源对象
castObj, ok := obj.(*fedv1b1.KubeFedCluster)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
klog.Errorf("Couldn't get object from tombstone %#v", obj)
return
}
castObj, ok = tombstone.Obj.(*fedv1b1.KubeFedCluster)
if !ok {
klog.Errorf("Tombstone contained object that is not expected %#v", obj)
return
}
}
// 执行删除对象处理
cc.delFromClusterSet(castObj)
},
AddFunc: func(obj interface{}) {
castObj := obj.(*fedv1b1.KubeFedCluster)
// 执行增加对象处理
cc.addToClusterSet(castObj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
var clusterChanged bool
cluster := newObj.(*fedv1b1.KubeFedCluster)
cc.mu.Lock()
// 更新下控制器的集群数据状态
clusterData, ok := cc.clusterDataMap[cluster.Name]
// 缓存和更新状态一致的判断,需要将 clusterChanged设为true
if !ok || !equality.Semantic.DeepEqual(clusterData.cachedObj.Spec, cluster.Spec) ||
!equality.Semantic.DeepEqual(clusterData.cachedObj.ObjectMeta.Annotations, cluster.ObjectMeta.Annotations) ||
!equality.Semantic.DeepEqual(clusterData.cachedObj.ObjectMeta.Labels, cluster.ObjectMeta.Labels) {
clusterChanged = true
}
cc.mu.Unlock()
// 否则 新版本和缓存一致,没有发生变化
if !clusterChanged {
return
}
// 执行更新对象处理,删了再加,简单粗暴
cc.delFromClusterSet(cluster)
cc.addToClusterSet(cluster)
},
},
)
return cc, err
}

这里面出现的 ClusterSet 增删函数其实简单,就是对控制器里的ClusterData map的增删而已:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (cc *ClusterController) delFromClusterSet(obj *fedv1b1.KubeFedCluster) {
cc.mu.Lock()
defer cc.mu.Unlock()
klog.V(1).Infof("ClusterController observed a cluster deletion: %v", obj.Name)
delete(cc.clusterDataMap, obj.Name)
}

func (cc *ClusterController) addToClusterSet(obj *fedv1b1.KubeFedCluster) {
cc.mu.Lock()
defer cc.mu.Unlock()
clusterData := cc.clusterDataMap[obj.Name]
// 这里比较奇怪,如果存在且集群client也存在就直接return了
// 个人感觉这里应该分析下问题
if clusterData != nil && clusterData.clusterKubeClient != nil {
return
}

klog.V(1).Infof("ClusterController observed a new cluster: %v", obj.Name)

// 给新集群创建一个集群Client
restClient, err := NewClusterClientSet(obj, cc.client, cc.fedNamespace, cc.clusterHealthCheckConfig.Timeout)
if err != nil || restClient == nil {
cc.RecordError(obj, "MalformedClusterConfig", errors.Wrap(err, "The configuration for this cluster may be malformed"))
return
}
// 把新 ClusterData 加到控制器里
cc.clusterDataMap[obj.Name] = &ClusterData{clusterKubeClient: restClient, cachedObj: obj.DeepCopy()}
}

这里也可以看出来,控制器里加的那个锁的目的其实就是用来保证ClusterData的并发安全(为啥不直接用并发安全字典)。

我们最后来看下 控制器的 run 方法,这个方法其实是被 StartClusterController 调用,StartClusterController 其实也只是对 run 方法的简单封装,这里就不详细看了,我们之间看 run 方法的实现:

1
2
3
4
5
6
7
8
9
10
11
func (cc *ClusterController) Run(stopChan <-chan struct{}) {
defer utilruntime.HandleCrash()
// 调用 cache Controller 的 Run 方法
go cc.clusterController.Run(stopChan)
// 周期性监控集群状态
go wait.Until(func() {
if err := cc.updateClusterStatus(); err != nil {
klog.Errorf("Error monitoring cluster status: %v", err)
}
}, cc.clusterHealthCheckConfig.Period, stopChan)
}

这里我们在看看这里涉及的更新集群状态函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
func (cc *ClusterController) updateClusterStatus() error {
clusters := &fedv1b1.KubeFedClusterList{}
err := cc.client.List(context.TODO(), clusters, cc.fedNamespace)
if err != nil {
return err
}

var wg sync.WaitGroup
for _, obj := range clusters.Items {
cc.mu.RLock()
cluster := obj.DeepCopy()
clusterData := cc.clusterDataMap[cluster.Name]
cc.mu.RUnlock()
if clusterData == nil {
// 如果 clusterData 不存在
cc.addToClusterSet(cluster)
cc.mu.RLock()
clusterData = cc.clusterDataMap[cluster.Name]
cc.mu.RUnlock()
if clusterData == nil {
klog.Warningf("Failed to retrieve stored data for cluster %s", cluster.Name)
continue
}
}

wg.Add(1)
// 真正的更新集群状态的方法
go cc.updateIndividualClusterStatus(cluster, clusterData, &wg)
}

wg.Wait()
return nil
}

func (cc *ClusterController) updateIndividualClusterStatus(cluster *fedv1b1.KubeFedCluster,
storedData *ClusterData, wg *sync.WaitGroup) {
defer metrics.ClusterHealthStatusDurationFromStart(time.Now())

clusterClient := storedData.clusterKubeClient
// 更新集群状态
currentClusterStatus, err := clusterClient.GetClusterHealthStatus()
if err != nil {
cc.RecordError(cluster, "RetrievingClusterHealthFailed", errors.Wrap(err, "Failed to retrieve health of the cluster"))
}

currentClusterStatus = thresholdAdjustedClusterStatus(currentClusterStatus, storedData, cc.clusterHealthCheckConfig)

if utilfeature.DefaultFeatureGate.Enabled(features.CrossClusterServiceDiscovery) {
currentClusterStatus = cc.updateClusterZonesAndRegion(currentClusterStatus, cluster, clusterClient)
}

storedData.clusterStatus = currentClusterStatus
cluster.Status = *currentClusterStatus
// 更新资源对象的status字段
if err := cc.client.UpdateStatus(context.TODO(), cluster); err != nil {
klog.Warningf("Failed to update the status of cluster %q: %v", cluster.Name, err)
}

wg.Done()
}

至此,kubefed 集群控制器的基本工作流也就梳理的差不多了,其实外部想启用 kubefed 集群控制器也只需调用下 StartClusterController 函数就可以让集群控制器工作了。
其实,kubefed 集群控制器就是监控 KubeFedCluster 资源对象的变更,通过 KubeFedCluster 里的具体字段,尝试与那个集群的 API server 通信,并将这些数据作为
ClusterData 存起来,用于其他的控制器监听联邦资源变化的时候使用。一句话,管理联邦集群里所有的集群的通信 Client,后面有用(留给其他的控制器用)。

CATALOG
  1. 1. 透视 kubefed 源码 —— ClusterController
    1. 1.0.1. 1.kubefed 中的集群资源
    2. 1.0.2. 2. kubefed 中的集群资源 Client
    3. 1.0.3. 3. kubefed 中的集群资源控制器