Funky's NoteBook

Kubefed - FTC Controller

字数统计: 3,425阅读时长: 17 min
2020/06/02 Share

透视 kubefed 源码 —— FTC Controller

kubefed 中的 FederatedTypeConfig(FTC) 是用于联邦资源到 kubernetes 集群资源的映射关系,以便让联邦可以管控各个集群里的API资源对象。

和之前一样,我们先看看API资源长什么样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
apiVersion: core.kubefed.io/v1beta1
kind: FederatedTypeConfig
metadata:
creationTimestamp: "2020-05-31T09:36:32Z"
finalizers:
- core.kubefed.io/federated-type-config
generation: 1
name: deployments.apps
namespace: kube-federation-system
resourceVersion: "15506"
selfLink: /apis/core.kubefed.io/v1beta1/namespaces/kube-federation-system/federatedtypeconfigs/deployments.apps
uid: 762447f3-b7f7-46d0-8f88-e0d2b47c6181
spec:
federatedType:
group: types.kubefed.io
kind: FederatedDeployment
pluralName: federateddeployments
scope: Namespaced
version: v1beta1
propagation: Enabled
targetType:
group: apps
kind: Deployment
pluralName: deployments
scope: Namespaced
version: v1
status:
observedGeneration: 1
propagationController: Running
statusController: NotRunning

这里需要注意的几个地方就是 spec 对应的 federatedType 是定义的联邦资源,而 targetType 实际上是真正的 kubernetes 资源,这个资源可以是 kubernetes 的原生资源也可以是类似 istio 的 CR。

这里的 propagation 字段若设置为 Enable,则 kubefed 可以通过 federatedType 来控制各个集群里的对应的 targetType 的资源,这个我们在后面具体展开讲。我们继续看看源码里 API 的定义,代码位于/pkg/apis/core/v1beta1/federatedtypeconfig_types.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type FederatedTypeConfigSpec struct {
// The configuration of the target type. If not set, the pluralName and
// groupName fields will be set from the metadata.name of this resource. The
// kind field must be set.
TargetType APIResource `json:"targetType"`
// Whether or not propagation to member clusters should be enabled.
Propagation PropagationMode `json:"propagation"`
// Configuration for the federated type that defines (via
// template, placement and overrides fields) how the target type
// should appear in multiple cluster.
FederatedType APIResource `json:"federatedType"`
// Configuration for the status type that holds information about which type
// holds the status of the federated resource. If not provided, the group
// and version will default to those provided for the federated type api
// resource.
// +optional
StatusType *APIResource `json:"statusType,omitempty"`
// Whether or not Status object should be populated.
// +optional
StatusCollection *StatusCollectionMode `json:"statusCollection,omitempty"`
}

这里的 APIResource 其实就是 GVK 的组合:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// APIResource defines how to configure the dynamic client for an API resource.
type APIResource struct {
// metav1.GroupVersion is not used since the json annotation of
// the fields enforces them as mandatory.
// Group of the resource.
// +optional
Group string `json:"group,omitempty"`
// Version of the resource.
Version string `json:"version"`
// Camel-cased singular name of the resource (e.g. ConfigMap)
Kind string `json:"kind"`
// Lower-cased plural name of the resource (e.g. configmaps). If
// not provided, it will be computed by lower-casing the kind and
// suffixing an 's'.
PluralName string `json:"pluralName"`
// Scope of the resource.
Scope apiextv1b1.ResourceScope `json:"scope"`
}

我们继续看看 FederatedTypeConfig 的状态结构体:

1
2
3
4
5
6
7
type FederatedTypeConfigStatus struct {
ObservedGeneration int64 `json:"observedGeneration"`
// PropagationController 追踪 sync controller 的状态.
PropagationController ControllerStatus `json:"propagationController"`
// StatusController 追踪 status controller 的状态.
StatusController *ControllerStatus `json:"statusController,omitempty"`
}

API看完了,根据 kubebuilder的 套路,我们直接去看控制器的源码吧!

控制器的源代码位于/pkg/controller/federatedtypeconfig/controller.go。我最开始,我们在API资源上就看见了 federatedtypeconfig 资源 (以下简称 FTC)里有个 finalizer 终结器,于是 在代码的最开头,kubefed 就声明了它:

1
const finalizer string = "core.kubefed.io/federated-type-config"

根据kubernetes 资源的约定写法,只有API资源的 终结器标签全部消失,该资源才能被删除,因此终结器其实是用于管理删除资源操作的标签,这玩意一般会和 DeletionTimestamp 配合删除我们的 CR。我们继续看看控制器结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type Controller struct {
// Arguments to use when starting new controllers
controllerConfig *util.ControllerConfig

client genericclient.Client
// 一个 hashmap,map的 key 为 sync 控制器的 targetType
stopChannels map[string]chan struct{}
lock sync.RWMutex

// 存放 FederatedTypeConfig 对象
store cache.Store
// FederatedTypeConfig 对象的 Informer
controller cache.Controller

worker util.ReconcileWorker
}

这个控制器和之前看的集群控制器明显不一样,这个一看就是 kubebuilder 生成的,我们直接去看下调协函数吧~

1
func (c *Controller) reconcile(qualifiedName util.QualifiedName) util.ReconciliationStatus {

这里的 util.QualifiedName 其实就是 名称和 NS 的组合, 和 kubebuilder v2 传入参数类似:

1
2
3
4
type QualifiedName struct {
Namespace string
Name string
}

由于调协函数内容较长,我们分成几个部分进行深入剖析:

  • 第一部分 初始化 FTC 资源对象的 Spec 部分

调协函数一开始将获取的 FTC 请求资源获取并做些初始化处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 // 获取请求的 FTC 的名称 比如:deployments.apps
key := qualifiedName.String()
// 拷贝至缓存
cachedObj, err := c.objCopyFromCache(key)
if err != nil {
return util.StatusError
}

if cachedObj == nil {
return util.StatusAllOK
}
// 通过类型断言直接将缓存中的对象转换为 FTC 对象
typeConfig := cachedObj.(*corev1b1.FederatedTypeConfig)

// 初始化下字段里的数据
corev1b1.SetFederatedTypeConfigDefaults(typeConfig)
// 获取 FTC 中的 sync 和 status 控制器的启动状态
syncEnabled := typeConfig.GetPropagationEnabled()
statusEnabled := typeConfig.GetStatusEnabled()
  • 第二部分 初始化 FTC 对象对应的控制器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
 limitedScope := c.controllerConfig.TargetNamespace != metav1.NamespaceAll
if limitedScope && syncEnabled && !typeConfig.GetNamespaced() {
_, ok := c.getStopChannel(typeConfig.Name)
if !ok {
holderChan := make(chan struct{})
c.lock.Lock()
// 将该 FTC 的 channel 加入到 控制器 map 里
c.stopChannels[typeConfig.Name] = holderChan
c.lock.Unlock()
klog.Infof("Skipping start of sync & status controller for cluster-scoped resource %q. It is not required for a namespaced KubeFed control plane.", typeConfig.GetFederatedType().Kind)
}
// 初始化 FTC 的相关状态
typeConfig.Status.ObservedGeneration = typeConfig.Generation
typeConfig.Status.PropagationController = corev1b1.ControllerStatusNotRunning

if typeConfig.Status.StatusController == nil {
typeConfig.Status.StatusController = new(corev1b1.ControllerStatus)
}
// 更新 Status 状态
*typeConfig.Status.StatusController = corev1b1.ControllerStatusNotRunning
err = c.client.UpdateStatus(context.TODO(), typeConfig)
if err != nil {
runtime.HandleError(errors.Wrapf(err, "Could not update status fields of the CRD: %q", key))
return util.StatusError
}
return util.StatusAllOK
}
// 获取 statusKey、停止channel
statusKey := typeConfig.Name + "/status"
//相当于 sync 和 status 两个 channel 都 存在一个 map 里了,名字对应是 name 和 name/status
syncStopChan, syncRunning := c.getStopChannel(typeConfig.Name)
statusStopChan, statusRunning := c.getStopChannel(statusKey)
  • 第三部分 删除 FTC 资源策略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
deleted := typeConfig.DeletionTimestamp != nil
if deleted {
// 停止两个控制器
if syncRunning {
c.stopController(typeConfig.Name, syncStopChan)
}
if statusRunning {
c.stopController(statusKey, statusStopChan)
}

if typeConfig.IsNamespace() {
klog.Infof("Reconciling all namespaced FederatedTypeConfig resources on deletion of %q", key)
// 更新 FTC
c.reconcileOnNamespaceFTCUpdate()
}
// 移除 Finalizer
err := c.removeFinalizer(typeConfig)
if err != nil {
runtime.HandleError(errors.Wrapf(err, "Failed to remove finalizer from FederatedTypeConfig %q", key))
return util.StatusError
}
return util.StatusAllOK
}
  • 第四部分 FTC 资源 Finalizer 设置策略

这块其实是给新创建的 FTC 资源用的,主要是用于给 FTC 增加 Finalizer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
 // 确保 FTC	具有 Finalizer
// 如果没有就添加
updated, err := c.ensureFinalizer(typeConfig)
if err != nil {
runtime.HandleError(errors.Wrapf(err, "Failed to ensure finalizer for FederatedTypeConfig %q", key))
return util.StatusError
} else if updated && typeConfig.IsNamespace() {
// Detected creation of the namespace FTC. If there are existing FTCs
// which did not start their sync controllers due to the lack of a
// namespace FTC, then reconcile them now so they can start.
klog.Infof("Reconciling all namespaced FederatedTypeConfig resources on finalizer update for %q", key)
// 更新资源
c.reconcileOnNamespaceFTCUpdate()
}
  • 第五部分 控制器调协策略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
 // 获取 Sync 控制器是否在运行
startNewSyncController := !syncRunning && syncEnabled
stopSyncController := syncRunning && (!syncEnabled || (typeConfig.GetNamespaced() && !c.namespaceFTCExists()))
if startNewSyncController {
// 没有启动 sync 控制器就启动下 sync 控制器
if err := c.startSyncController(typeConfig); err != nil {
runtime.HandleError(err)
return util.StatusError
}
// 如果 需要停止 sync 控制器就停止
} else if stopSyncController {
c.stopController(typeConfig.Name, syncStopChan)
}
// 获取 Status 控制器是否在运行
startNewStatusController := !statusRunning && statusEnabled
stopStatusController := statusRunning && !statusEnabled
if startNewStatusController {
// 如果出现异常就启动下 status 控制器
if err := c.startStatusController(statusKey, typeConfig); err != nil {
runtime.HandleError(err)
return util.StatusError
}
// 若 status 控制器被 stop,那么停止该控制器
} else if stopStatusController {
// 其实就是把 statusKey 的那个 channel 从 FTC 控制器的map里删除
c.stopController(statusKey, statusStopChan)
}
// 处理 Sync 控制器状态
if !startNewSyncController && !stopSyncController &&
typeConfig.Status.ObservedGeneration != typeConfig.Generation {
//
if err := c.refreshSyncController(typeConfig); err != nil {
runtime.HandleError(err)
return util.StatusError
}
}
// 设置 ObservedGeneration 状态
typeConfig.Status.ObservedGeneration = typeConfig.Generation
syncControllerRunning := startNewSyncController || (syncRunning && !stopSyncController)
// 更新 FTC 的 sync 控制器状态信息
if syncControllerRunning {
typeConfig.Status.PropagationController = corev1b1.ControllerStatusRunning
} else {
typeConfig.Status.PropagationController = corev1b1.ControllerStatusNotRunning
}
// 更新 FTC 的 status 控制器状态信息
if typeConfig.Status.StatusController == nil {
typeConfig.Status.StatusController = new(corev1b1.ControllerStatus)
}
statusControllerRunning := startNewStatusController || (statusRunning && !stopStatusController)
if statusControllerRunning {
*typeConfig.Status.StatusController = corev1b1.ControllerStatusRunning
} else {
*typeConfig.Status.StatusController = corev1b1.ControllerStatusNotRunning
}
// 更新 FTC 资源的状态
err = c.client.UpdateStatus(context.TODO(), typeConfig)
if err != nil {
runtime.HandleError(errors.Wrapf(err, "Could not update status fields of the CRD: %q", key))
return util.StatusError
}
return util.StatusAllOK

个人认为,kubefed 的 FTC 调协代码过于冗长,造成可读性差不易维护,可以将这5个部分独立拆分成处理方法,增强可读性的同时也方便维护,并且每一次调协,都需要定义那么多的变量,很可能只是一次 FTC资源的更新而出发的调协,就会造成很多无用的变量被定义。

这里有几个涉及到的方法,我们可以拿出来具体分析下:

  • startSyncController

这也是我们第一次见到和 sync 控制器相关的逻辑,我们先看看在 FTC 控制器里做了什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (c *Controller) startSyncController(tc *corev1b1.FederatedTypeConfig) error {
// TODO(marun) Consider using a shared informer for federated
// namespace that can be shared between all controllers of a
// cluster-scoped KubeFed control plane. A namespace-scoped
// control plane would still have to use a non-shared informer due
// to it not being possible to limit its scope.
// 首先获取 FTC
ftc := tc.DeepCopyObject().(*corev1b1.FederatedTypeConfig)
// 获取联邦类型的 Kind
kind := ftc.Spec.FederatedType.Kind
// 对于 sync 控制器管理的资源都需要提供 ns 用于配合联邦命名空间
var fedNamespaceAPIResource *metav1.APIResource
if ftc.GetNamespaced() {
var err error
fedNamespaceAPIResource, err = c.getFederatedNamespaceAPIResource()
if err != nil {
return errors.Wrapf(err, "Unable to start sync controller for %q due to missing FederatedTypeConfig for namespaces", kind)
}
}

stopChan := make(chan struct{})
// 这里才真正的启动了 sync 控制器
err := synccontroller.StartKubeFedSyncController(c.controllerConfig, stopChan, ftc, fedNamespaceAPIResource)
if err != nil {
close(stopChan)
return errors.Wrapf(err, "Error starting sync controller for %q", kind)
}
klog.Infof("Started sync controller for %q", kind)
c.lock.Lock()
defer c.lock.Unlock()
// 这里创建的 channel 用于管理相应的 sync 控制器
c.stopChannels[ftc.Name] = stopChan
return nil
}

看到这里其实基本上也就知道 sync 控制器是干啥的了,sync 控制器其实才是真正管理联邦自定义资源的控制器。

  • startStatusController

这也是我们第一次见到和 status 控制器相关的逻辑,功能比 sync 控制器要简单,其实也就是启动下 status控制器而已,同时生成一个 channel 用于 FTC 随时关闭相关的 status 控制器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (c *Controller) startStatusController(statusKey string, tc *corev1b1.FederatedTypeConfig) error {
kind := tc.Spec.FederatedType.Kind
stopChan := make(chan struct{})
ftc := tc.DeepCopyObject().(*corev1b1.FederatedTypeConfig)
err := statuscontroller.StartKubeFedStatusController(c.controllerConfig, stopChan, ftc)
if err != nil {
close(stopChan)
return errors.Wrapf(err, "Error starting status controller for %q", kind)
}
klog.Infof("Started status controller for %q", kind)
c.lock.Lock()
defer c.lock.Unlock()
c.stopChannels[statusKey] = stopChan
return nil
}
  • refreshSyncController

刷新就更简单粗暴了,就是停止再启动:

1
2
3
4
5
6
7
8
9
10
func (c *Controller) refreshSyncController(tc *corev1b1.FederatedTypeConfig) error {
klog.Infof("refreshing sync controller for %q", tc.Name)

syncStopChan, ok := c.getStopChannel(tc.Name)
if ok {
c.stopController(tc.Name, syncStopChan)
}

return c.startSyncController(tc)
}

那么整个的 FTC Controller 部分的逻辑就看完了, FTC 在 kubefed 里就像个中介一样,负责管理联邦资源(包括自定义资源)和 kubernetes 的资源(包括 CRD)的一一对应关系。

我们在使用 kubefedctl 命令行的 enable 其实就是创建一个 FTC 资源提交到中心的 API server,在中心的kubernetes集群里去触发 FTC 控制器来完成一系列的处理,我这里找到了 enable 中生成 FTC 资源的函数,大家看了就明白了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func GenerateTypeConfigForTarget(apiResource metav1.APIResource, enableTypeDirective *EnableTypeDirective) typeconfig.Interface {
spec := enableTypeDirective.Spec
kind := apiResource.Kind
pluralName := apiResource.Name
// 生成 FTC 资源对象
typeConfig := &fedv1b1.FederatedTypeConfig{
// Explicitly including TypeMeta will ensure it will be
// serialized properly to yaml.
TypeMeta: metav1.TypeMeta{
Kind: "FederatedTypeConfig",
APIVersion: "core.kubefed.io/v1beta1",
},
ObjectMeta: metav1.ObjectMeta{
Name: typeconfig.GroupQualifiedName(apiResource),
},
Spec: fedv1b1.FederatedTypeConfigSpec{
TargetType: fedv1b1.APIResource{
Version: apiResource.Version,
Kind: kind,
Scope: NamespacedToScope(apiResource),
},
Propagation: fedv1b1.PropagationEnabled,
FederatedType: fedv1b1.APIResource{
Group: spec.FederatedGroup,
Version: spec.FederatedVersion,
Kind: fmt.Sprintf("Federated%s", kind),
PluralName: fmt.Sprintf("federated%s", pluralName),
Scope: FederatedNamespacedToScope(apiResource),
},
},
}
// 设置下 FTC 资源对象的一些默认值
fedv1b1.SetFederatedTypeConfigDefaults(typeConfig)
return typeConfig
}

我们继续看看调用这个函数的原函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func GetResources(config *rest.Config, enableTypeDirective *EnableTypeDirective) (*typeResources, error) {
// 从这个集群里已经存在的所有资源类型(包括CRD)找到需要联邦管理的资源
apiResource, err := LookupAPIResource(config, enableTypeDirective.Name, enableTypeDirective.Spec.TargetVersion)
if err != nil {
return nil, err
}
klog.V(2).Infof("Found type %q", resourceKey(*apiResource))
// 开始调用上述的函数了
typeConfig := GenerateTypeConfigForTarget(*apiResource, enableTypeDirective)
// 这里的 accessor 在后面用于给 API 做 Validation Schema
accessor, err := newSchemaAccessor(config, *apiResource)
if err != nil {
return nil, errors.Wrap(err, "Error initializing validation schema accessor")
}

shortNames := []string{}
for _, shortName := range apiResource.ShortNames {
shortNames = append(shortNames, fmt.Sprintf("f%s", shortName))
}
// 生成联邦资源CRD
crd := federatedTypeCRD(typeConfig, accessor, shortNames)
return &typeResources{
TypeConfig: typeConfig,
CRD: crd,
}, nil
}

这下大家都看明白了,其实也就是对创建一个 FTC 然后再声明一个联邦 CRD 和 FTC 对象关联,并通过 sync 控制器去获取联邦资源管理的真正的 kubernetes 资源在集群中的状态来实现对联邦资源的管理。

也就是说,FTC 资源其实是对 kubernetes 资源映射,当 FTC 控制器把 sync 控制器启动了之后,sync 控制器再去监控联邦 CRD (即以federated开头的一些联邦资源),这些资源的创建其实才是 sync 控制器管理的真正对象。FTC 控制器的真正目的其实就是看那些kubernetes资源对象需要被联邦管理,哪些不需要。需要的资源对象,你就创建一个 FTC 对象声明下,那么 FTC 控制器会启动一个 sync 控制器来跟进这个kubernetes 对象(或kubernetes CRD)的变化。

CATALOG
  1. 1. 透视 kubefed 源码 —— FTC Controller