Funky's NoteBook

Kubefed - Sync Controller

字数统计: 3,349阅读时长: 16 min
2020/06/03 Share

透视 kubefed 源码 — Sync Controller

KubeFedSyncController 的主要功能是同步联邦中的联邦资源状态。

因为 Sync 控制器监控的 CRD 是 FTC 控制器创建出来的,所以我们找不到一个具体的CRD API Types 定义,我们可以直接看看 CR 长什么样子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
apiVersion: types.kubefed.io/v1beta1
kind: FederatedDeployment
metadata:
name: test-deployment
namespace: test-namespace
spec:
template:
metadata:
labels:
app: nginx
spec:
replicas: 3
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- image: nginx
name: nginx
placement:
clusters:
- name: center
- name: us
overrides:
- clusterName: us
clusterOverrides:
- path: "/spec/replicas"
value: 5
- path: "/spec/template/spec/containers/0/image"
value: "nginx:1.17.0-alpine"
- path: "/metadata/annotations"
op: "add"
value:
foo: bar
- path: "/metadata/annotations/foo"
op: "remove"

这是一个 FederatedDeployment 的例子,看上去其实和 Kubernetes 原生 Deployment 不太大,spec 里其实就是设置真正的 Deployment 的内容,但是对于联邦而言,这个 Deployment 对象不一定应用多所有联邦管辖的集群里,所以在 FederatedDeployment 里需要设置 placement 来声明这个 FederatedDeployment 需要应用到哪些集群里。同时,对于不同的集群,可能副本和镜像名称都会有些许变化,所以在 FederatedDeployment 里,你可以设置 overrides 来声明在不同的集群里,哪些字段需要发生改变。

FederatedDeployment 只是其中的一种联邦资源,其他更多的联邦资源可以通过定义 FTC 来实现。

我们来具体看看 Sync 控制器的内部组成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type KubeFedSyncController struct {
// TODO(marun) add comment
worker util.ReconcileWorker

// clusterDeliverer 用于触发使用联邦资源的调协
// 当一个的新集群 ready 时,clusterDeliverer
// 将会被使用
clusterDeliverer *util.DelayingDeliverer

// 成员集群的资源 Informer
informer util.FederatedInformer

// For events
eventRecorder record.EventRecorder

clusterAvailableDelay time.Duration
clusterUnavailableDelay time.Duration
smallDelay time.Duration
// FTC 接口
typeConfig typeconfig.Interface
// 联邦资源访问器
fedAccessor FederatedResourceAccessor
// 主节点 Client
hostClusterClient genericclient.Client

skipAdoptingResources bool

limitedScope bool
}

接着我们看看 Sync 控制器的启动函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func StartKubeFedSyncController(controllerConfig *util.ControllerConfig, stopChan <-chan struct{}, typeConfig typeconfig.Interface, fedNamespaceAPIResource *metav1.APIResource) error {
// 启动一个同步控制器,并指明监听指定 FTC 资源类型
controller, err := newKubeFedSyncController(controllerConfig, typeConfig, fedNamespaceAPIResource)
if err != nil {
return err
}
// 设置控制器最低延迟
if controllerConfig.MinimizeLatency {
controller.minimizeLatency()
}
klog.Infof(fmt.Sprintf("Starting sync controller for %q", typeConfig.GetFederatedType().Kind))
// 启动同步控制器
controller.Run(stopChan)
return nil
}

看完启动函数,我们了解到,启动一个 sync 控制器其实需要关联一个 typeConfig(之后简称 TC),该 sync 控制器就用于监控该 TC 资源类型的对象变更。

其中.启动函数里的创建 sync 控制器函数需要做一些初始化的操作,你可以通过我在代码里的注释了解他的工作原理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func newKubeFedSyncController(controllerConfig *util.ControllerConfig, typeConfig typeconfig.Interface, fedNamespaceAPIResource *metav1.APIResource) (*KubeFedSyncController, error) {
// 将 TC 转换为联邦资源
federatedTypeAPIResource := typeConfig.GetFederatedType()
userAgent := fmt.Sprintf("%s-controller", strings.ToLower(federatedTypeAPIResource.Kind))

// 初始化非动态 client 以防配置被污染
client := genericclient.NewForConfigOrDieWithUserAgent(controllerConfig.KubeConfig, userAgent)
kubeClient := kubeclient.NewForConfigOrDie(controllerConfig.KubeConfig)

// 这里就真正定义了控制器结构体
s := &KubeFedSyncController{
clusterAvailableDelay: controllerConfig.ClusterAvailableDelay,
clusterUnavailableDelay: controllerConfig.ClusterUnavailableDelay,
smallDelay: time.Second * 3,
eventRecorder: recorder,
typeConfig: typeConfig,
hostClusterClient: client,
skipAdoptingResources: controllerConfig.SkipAdoptingResources,
limitedScope: controllerConfig.LimitedScope(),
}
// 设置调协的协程数
s.worker = util.NewReconcileWorker(s.reconcile, util.WorkerTiming{
ClusterSyncDelay: s.clusterAvailableDelay,
})

// 构建 deliverer 用于触发集群调协
s.clusterDeliverer = util.NewDelayingDeliverer()
// 目标资源,即真正的 kubernetes 资源
targetAPIResource := typeConfig.GetTargetType()

// 用于监听成员集群中联邦资源变化的 informer
// 1.
// 这里以传参为 pkgruntime.Object 的匿名函数其实是触发函数,
// 其实就是联邦资源发送变更触发。
// 2.
// 这里 ClusterLifecycleHandlerFuncs 其实是集群的生命周期的
// 处理函数(两个方法被封装到了一个处理结构体里),
// 当成员集群的状态发生了变更,集群需要重新处理目标资源(调协)。
var err error
s.informer, err = util.NewFederatedInformer(
controllerConfig,
client,
&targetAPIResource,
func(obj pkgruntime.Object) {
qualifiedName := util.NewQualifiedName(obj)
s.worker.EnqueueForRetry(qualifiedName)
},
&util.ClusterLifecycleHandlerFuncs{
ClusterAvailable: func(cluster *fedv1b1.KubeFedCluster) {
// 当新的成员集群可用时,重新处理所有目标资源。
s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterAvailableDelay))
},
// 当某个成员集群不可用时,也需要重新处理所有目标资源。
ClusterUnavailable: func(cluster *fedv1b1.KubeFedCluster, _ []interface{}) {
s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterUnavailableDelay))
},
},
)
if err != nil {
return nil, err
}
// 设置 sync 控制器的联邦资源 Accessor
s.fedAccessor, err = NewFederatedResourceAccessor(
controllerConfig, typeConfig, fedNamespaceAPIResource,
client, s.worker.EnqueueObject, recorder)
if err != nil {
return nil, err
}
// 把 KubeFedSyncController 里的所有对象初始化完毕返回就可以了
return s, nil
}

这下看明白了,其实联邦只负责把真正的 kubernetes 资源丢给相应的集群,然后他不会去管具体集群里这个kubernetes 资源是怎么部署的,副本是多少,这些他并不关心。同步控制器只关心你的这个集群是否是可用的,如果可用就把 kubernetes 资源丢给你 apply。触发调协也只是当集群的状态发生了变化或者用户对联邦资源做了修改,才会触发 sync 的调协(我 Sync 控制器只是个送信的!)。

接着我们看看 Sync 控制器的启动函数里涉及到的 Run 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (s *KubeFedSyncController) Run(stopChan <-chan struct{}) {
// 启动联邦资源 Accessor
s.fedAccessor.Run(stopChan)
// 启动 informer
s.informer.Start()
// 启动集群 Deliverer
s.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) {
s.reconcileOnClusterChange()
})
// 启动调协的协程
s.worker.Run(stopChan)

// 一旦收到撤销信号,立即停止 informer 和集群 Deliverer
go func() {
<-stopChan
s.informer.Stop()
s.clusterDeliverer.Stop()
}()
}

Run只是封装了单纯的启动流程,不多废话。

在看真正的调协函数之前,我们先理清几个在调协函数里出现的几个函数,理解了这些函数,我们看调协函数的逻辑就一目了然了。

  • isSynced

该函数用于检查所有 store 是否已经同步了。 如果 informer 或 store 尚未与相应的 api server 同步,则返回 False。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (s *KubeFedSyncController) isSynced() bool {
// 集群是否同步了
if !s.informer.ClustersSynced() {
klog.V(2).Infof("Cluster list not synced")
return false
}
// 联邦 Accessor 是否同步了
if !s.fedAccessor.HasSynced() {
return false
}

// 那些已经是 Ready 的集群
clusters, err := s.informer.GetReadyClusters()
if err != nil {
runtime.HandleError(errors.Wrap(err, "Failed to get ready clusters"))
return false
}
// Store 是否同步了集群状态
if !s.informer.GetTargetStore().ClustersSynced(clusters) {
return false
}
return true
}
  • reconcileOnClusterChange

该函数用于触发所以联邦资源的调协(因为集群发生了变更)

1
2
3
4
5
6
7
8
9
10
11
func (s *KubeFedSyncController) reconcileOnClusterChange() {
// 没有同步就触发下同步
if !s.isSynced() {
s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterAvailableDelay))
}
// 触发调协
s.fedAccessor.VisitFederatedResources(func(obj interface{}) {
qualifiedName := util.NewQualifiedName(obj.(pkgruntime.Object))
s.worker.EnqueueWithDelay(qualifiedName, s.smallDelay)
})
}

这两个看完了,我们来看看真正的调协函数在做哪些东西:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (s *KubeFedSyncController) reconcile(qualifiedName util.QualifiedName) util.ReconciliationStatus {
// Sync 控制器是否同步了相关信息
if !s.isSynced() {
return util.StatusNotSynced
}
// 获取kubernetes 资源类
kind := s.typeConfig.GetFederatedType().Kind
// 获取联邦资源
fedResource, possibleOrphan, err := s.fedAccessor.FederatedResource(qualifiedName)
if err != nil {
runtime.HandleError(errors.Wrapf(err, "Error creating FederatedResource helper for %s %q", kind, qualifiedName))
return util.StatusError
}
//...
// 获取联邦资源名称
key := fedResource.FederatedName().String()

klog.V(4).Infof("Starting to reconcile %s %q", kind, key)
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished reconciling %s %q (duration: %v)", kind, key, time.Since(startTime))
metrics.ReconcileFederatedResourcesDurationFromStart(startTime)
}()
// 如果删除资源,确保删除联邦对象在各个进群中对应的kubernetes 资源对象
if fedResource.Object().GetDeletionTimestamp() != nil {
return s.ensureDeletion(fedResource)
}
// 确保 Finalizer
// 这个和之前看的 Finalizer 处理方式类似
err = s.ensureFinalizer(fedResource)
if err != nil {
fedResource.RecordError("EnsureFinalizerError", errors.Wrap(err, "Failed to ensure finalizer"))
return util.StatusError
}
// 这才开始真正处理同步集群中的资源
return s.syncToClusters(fedResource)
}

看起来好像只是做了一个删除资源的处理,其实最后的 syncToClusters 才是大头:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
func (s *KubeFedSyncController) syncToClusters(fedResource FederatedResource) util.ReconciliationStatus {
// 获取集群列表
clusters, err := s.informer.GetClusters()
if err != nil {
fedResource.RecordError(string(status.ClusterRetrievalFailed), errors.Wrap(err, "Failed to retrieve list of clusters"))
return s.setFederatedStatus(fedResource, status.ClusterRetrievalFailed, nil)
}
// 通过联邦资源里的 Placement 找到被选中的联邦集群
selectedClusterNames, err := fedResource.ComputePlacement(clusters)
if err != nil {
fedResource.RecordError(string(status.ComputePlacementFailed), errors.Wrap(err, "Failed to compute placement"))
return s.setFederatedStatus(fedResource, status.ComputePlacementFailed, nil)
}
// 找到真正的kubernetes 资源 GVK 和对应的资源名称
kind := fedResource.TargetKind()
key := fedResource.TargetName().String()
klog.V(4).Infof("Ensuring %s %q in clusters: %s", kind, key, strings.Join(selectedClusterNames.List(), ","))

dispatcher := dispatch.NewManagedDispatcher(s.informer.GetClientForCluster, fedResource, s.skipAdoptingResources)
// 分发 kubernetes 资源对象
for _, cluster := range clusters {
clusterName := cluster.Name
// 联邦资源是否选中了这个节点
selectedCluster := selectedClusterNames.Has(clusterName)
// 集群不是 ready 状态
if !util.IsClusterReady(&cluster.Status) {
// 但是集群被选中了
if selectedCluster {
err := errors.New("Cluster not ready")
dispatcher.RecordClusterError(status.ClusterNotReady, clusterName, err)
}
// 报个错,继续找下一个集群
continue
}
// 获取集群对象
rawClusterObj, _, err := s.informer.GetTargetStore().GetByKey(clusterName, key)
if err != nil {
wrappedErr := errors.Wrap(err, "Failed to retrieve cached cluster object")
dispatcher.RecordClusterError(status.CachedRetrievalFailed, clusterName, wrappedErr)
continue
}

var clusterObj *unstructured.Unstructured
if rawClusterObj != nil {
clusterObj = rawClusterObj.(*unstructured.Unstructured)
}

// 如果资源没有选中该集群,
// 需要判断这个集群现在还是否在联邦的管理之下
if !selectedCluster {
// 这个集群已经不在了
if clusterObj == nil {
continue
}
// 这个集群还在,但是这个集群要被移除了
if clusterObj.GetDeletionTimestamp() != nil {
// 把该资源标记为
dispatcher.RecordStatus(clusterName, status.WaitingForRemoval)
continue
}
// 这个资源还在中心集群里跑
if fedResource.IsNamespaceInHostCluster(clusterObj) {
// 中心集群需要管理被删除管理标签以便不再缓存该集群
dispatcher.RemoveManagedLabel(clusterName, clusterObj)
} else {
// 在该集群里删除
dispatcher.Delete(clusterName)
}
continue
}

// 如果资源选中了该集群,
// 集群资源之前没有创建过改资源
if clusterObj == nil {
// 在该集群上创建该资源
dispatcher.Create(clusterName)
} else {
// 在该集群上更新该资源
dispatcher.Update(clusterName, clusterObj)
}
}
_, timeoutErr := dispatcher.Wait()
if timeoutErr != nil {
fedResource.RecordError("OperationTimeoutError", timeoutErr)
}

// 更新改资源的版本号
updatedVersionMap := dispatcher.VersionMap()
err = fedResource.UpdateVersions(selectedClusterNames.List(), updatedVersionMap)
if err != nil {
runtime.HandleError(err)
}
// 获取资源的分发状态
collectedStatus := dispatcher.CollectedStatus()
// 更新资源状态,需要传入联邦资源对象和资源对象的分发状态。
return s.setFederatedStatus(fedResource, status.AggregateSuccess, &collectedStatus)
}

通过上面的代码可以看出,这里 dispatcher 去在具体某个集群上做真正的 CRUD 资源操作。个人认为,这里的处理可以改成并发模型处理,如果联邦里集群很多,那么这个for循环一个个找集群做调协可能需要花费很长的时间。

上面的函数最终还需要设置联邦的状态,这个函数主要是设置 conditions 和 联邦资源对象的 map 中的集群字段,最后的返回值就是调协的最终状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func (s *KubeFedSyncController) setFederatedStatus(fedResource FederatedResource,
reason status.AggregateReason,collectedStatus *status.CollectedPropagationStatus) util.ReconciliationStatus {
// 如果状态没设置
if collectedStatus == nil {
collectedStatus = &status.CollectedPropagationStatus{}
}
// 获取联邦资源对象的信息
kind := fedResource.FederatedKind()
name := fedResource.FederatedName()
obj := fedResource.Object()
// 随时报告 propagation 失败的 reason
// 如果没有显式错误,则报告 NamespaceNotFederated
if reason == status.AggregateSuccess {
// 对于控制平面而言,当一个命名空间里有联邦资源,
// 但是这个命名空间没有定义为联邦资源,那么需要报告
//
// The KubeFed system 的命名空间是隐含的联邦资源,并且在控制平面命名空间里管辖
if !s.limitedScope && fedResource.NamespaceNotFederated() {
reason = status.NamespaceNotFederated
}
}
// 如果 kubernetes 资源状态发生变更,则需要不断尝试恢复和更新
// 这里设置了间隔1秒超时5秒的更新策略。
err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
// 设置联邦资源状态是否存在问题
if updateRequired, err := status.SetFederatedStatus(obj, reason, *collectedStatus); err != nil {
return false, errors.Wrapf(err, "failed to set the status")
// 是否需要更新状态
} else if !updateRequired {
klog.V(4).Infof("No status update necessary for %s %q", kind, name)
return true, nil
}
// 更新中心集群里该联邦的状态
err := s.hostClusterClient.UpdateStatus(context.TODO(), obj)
if err == nil {
return true, nil
}
// 更新 API 状态资源发生冲突的处理
if apierrors.IsConflict(err) {
klog.V(2).Infof("Failed to set propagation status for %s %q due to conflict (will retry): %v.", kind, name, err)
err := s.hostClusterClient.Get(context.TODO(), obj, obj.GetNamespace(), obj.GetName())
if err != nil {
return false, errors.Wrapf(err, "failed to retrieve resource")
}
return false, nil
}
return false, errors.Wrapf(err, "failed to update resource")
})
if err != nil {
runtime.HandleError(errors.Wrapf(err, "failed to set propagation status for %s %q", kind, name))
return util.StatusError
}

return util.StatusAllOK
}

那么以上就是 Sync 控制器的整体逻辑,这里涉及到的联邦资源状态我们在下一个章节继续探讨。

CATALOG
  1. 1. 透视 kubefed 源码 — Sync Controller