透视 kubefed 源码 — Sync Controller
KubeFedSyncController 的主要功能是同步联邦中的联邦资源状态。
因为 Sync 控制器监控的 CRD 是 FTC 控制器创建出来的,所以我们找不到一个具体的CRD API Types 定义,我们可以直接看看 CR 长什么样子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| apiVersion: types.kubefed.io/v1beta1 kind: FederatedDeployment metadata: name: test-deployment namespace: test-namespace spec: template: metadata: labels: app: nginx spec: replicas: 3 selector: matchLabels: app: nginx template: metadata: labels: app: nginx spec: containers: - image: nginx name: nginx placement: clusters: - name: center - name: us overrides: - clusterName: us clusterOverrides: - path: "/spec/replicas" value: 5 - path: "/spec/template/spec/containers/0/image" value: "nginx:1.17.0-alpine" - path: "/metadata/annotations" op: "add" value: foo: bar - path: "/metadata/annotations/foo" op: "remove"
|
这是一个 FederatedDeployment 的例子,看上去其实和 Kubernetes 原生 Deployment 不太大,spec 里其实就是设置真正的 Deployment 的内容,但是对于联邦而言,这个 Deployment 对象不一定应用多所有联邦管辖的集群里,所以在 FederatedDeployment 里需要设置 placement 来声明这个 FederatedDeployment 需要应用到哪些集群里。同时,对于不同的集群,可能副本和镜像名称都会有些许变化,所以在 FederatedDeployment 里,你可以设置 overrides 来声明在不同的集群里,哪些字段需要发生改变。
FederatedDeployment 只是其中的一种联邦资源,其他更多的联邦资源可以通过定义 FTC 来实现。
我们来具体看看 Sync 控制器的内部组成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| type KubeFedSyncController struct { worker util.ReconcileWorker
clusterDeliverer *util.DelayingDeliverer
informer util.FederatedInformer
eventRecorder record.EventRecorder clusterAvailableDelay time.Duration clusterUnavailableDelay time.Duration smallDelay time.Duration typeConfig typeconfig.Interface fedAccessor FederatedResourceAccessor hostClusterClient genericclient.Client
skipAdoptingResources bool
limitedScope bool }
|
接着我们看看 Sync 控制器的启动函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| func StartKubeFedSyncController(controllerConfig *util.ControllerConfig, stopChan <-chan struct{}, typeConfig typeconfig.Interface, fedNamespaceAPIResource *metav1.APIResource) error { controller, err := newKubeFedSyncController(controllerConfig, typeConfig, fedNamespaceAPIResource) if err != nil { return err } if controllerConfig.MinimizeLatency { controller.minimizeLatency() } klog.Infof(fmt.Sprintf("Starting sync controller for %q", typeConfig.GetFederatedType().Kind)) controller.Run(stopChan) return nil }
|
看完启动函数,我们了解到,启动一个 sync 控制器其实需要关联一个 typeConfig(之后简称 TC),该 sync 控制器就用于监控该 TC 资源类型的对象变更。
其中.启动函数里的创建 sync 控制器函数需要做一些初始化的操作,你可以通过我在代码里的注释了解他的工作原理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| func newKubeFedSyncController(controllerConfig *util.ControllerConfig, typeConfig typeconfig.Interface, fedNamespaceAPIResource *metav1.APIResource) (*KubeFedSyncController, error) { federatedTypeAPIResource := typeConfig.GetFederatedType() userAgent := fmt.Sprintf("%s-controller", strings.ToLower(federatedTypeAPIResource.Kind)) client := genericclient.NewForConfigOrDieWithUserAgent(controllerConfig.KubeConfig, userAgent) kubeClient := kubeclient.NewForConfigOrDie(controllerConfig.KubeConfig) s := &KubeFedSyncController{ clusterAvailableDelay: controllerConfig.ClusterAvailableDelay, clusterUnavailableDelay: controllerConfig.ClusterUnavailableDelay, smallDelay: time.Second * 3, eventRecorder: recorder, typeConfig: typeConfig, hostClusterClient: client, skipAdoptingResources: controllerConfig.SkipAdoptingResources, limitedScope: controllerConfig.LimitedScope(), } s.worker = util.NewReconcileWorker(s.reconcile, util.WorkerTiming{ ClusterSyncDelay: s.clusterAvailableDelay, })
s.clusterDeliverer = util.NewDelayingDeliverer() targetAPIResource := typeConfig.GetTargetType()
var err error s.informer, err = util.NewFederatedInformer( controllerConfig, client, &targetAPIResource, func(obj pkgruntime.Object) { qualifiedName := util.NewQualifiedName(obj) s.worker.EnqueueForRetry(qualifiedName) }, &util.ClusterLifecycleHandlerFuncs{ ClusterAvailable: func(cluster *fedv1b1.KubeFedCluster) { s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterAvailableDelay)) }, ClusterUnavailable: func(cluster *fedv1b1.KubeFedCluster, _ []interface{}) { s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterUnavailableDelay)) }, }, ) if err != nil { return nil, err } s.fedAccessor, err = NewFederatedResourceAccessor( controllerConfig, typeConfig, fedNamespaceAPIResource, client, s.worker.EnqueueObject, recorder) if err != nil { return nil, err } return s, nil }
|
这下看明白了,其实联邦只负责把真正的 kubernetes 资源丢给相应的集群,然后他不会去管具体集群里这个kubernetes 资源是怎么部署的,副本是多少,这些他并不关心。同步控制器只关心你的这个集群是否是可用的,如果可用就把 kubernetes 资源丢给你 apply。触发调协也只是当集群的状态发生了变化或者用户对联邦资源做了修改,才会触发 sync 的调协(我 Sync 控制器只是个送信的!)。
接着我们看看 Sync 控制器的启动函数里涉及到的 Run 函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| func (s *KubeFedSyncController) Run(stopChan <-chan struct{}) { s.fedAccessor.Run(stopChan) s.informer.Start() s.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) { s.reconcileOnClusterChange() }) s.worker.Run(stopChan)
go func() { <-stopChan s.informer.Stop() s.clusterDeliverer.Stop() }() }
|
Run只是封装了单纯的启动流程,不多废话。
在看真正的调协函数之前,我们先理清几个在调协函数里出现的几个函数,理解了这些函数,我们看调协函数的逻辑就一目了然了。
该函数用于检查所有 store 是否已经同步了。 如果 informer 或 store 尚未与相应的 api server 同步,则返回 False。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| func (s *KubeFedSyncController) isSynced() bool { if !s.informer.ClustersSynced() { klog.V(2).Infof("Cluster list not synced") return false } if !s.fedAccessor.HasSynced() { return false }
clusters, err := s.informer.GetReadyClusters() if err != nil { runtime.HandleError(errors.Wrap(err, "Failed to get ready clusters")) return false } if !s.informer.GetTargetStore().ClustersSynced(clusters) { return false } return true }
|
该函数用于触发所以联邦资源的调协(因为集群发生了变更)
1 2 3 4 5 6 7 8 9 10 11
| func (s *KubeFedSyncController) reconcileOnClusterChange() { if !s.isSynced() { s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterAvailableDelay)) } s.fedAccessor.VisitFederatedResources(func(obj interface{}) { qualifiedName := util.NewQualifiedName(obj.(pkgruntime.Object)) s.worker.EnqueueWithDelay(qualifiedName, s.smallDelay) }) }
|
这两个看完了,我们来看看真正的调协函数在做哪些东西:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| func (s *KubeFedSyncController) reconcile(qualifiedName util.QualifiedName) util.ReconciliationStatus { if !s.isSynced() { return util.StatusNotSynced } kind := s.typeConfig.GetFederatedType().Kind fedResource, possibleOrphan, err := s.fedAccessor.FederatedResource(qualifiedName) if err != nil { runtime.HandleError(errors.Wrapf(err, "Error creating FederatedResource helper for %s %q", kind, qualifiedName)) return util.StatusError } key := fedResource.FederatedName().String()
klog.V(4).Infof("Starting to reconcile %s %q", kind, key) startTime := time.Now() defer func() { klog.V(4).Infof("Finished reconciling %s %q (duration: %v)", kind, key, time.Since(startTime)) metrics.ReconcileFederatedResourcesDurationFromStart(startTime) }() if fedResource.Object().GetDeletionTimestamp() != nil { return s.ensureDeletion(fedResource) } err = s.ensureFinalizer(fedResource) if err != nil { fedResource.RecordError("EnsureFinalizerError", errors.Wrap(err, "Failed to ensure finalizer")) return util.StatusError } return s.syncToClusters(fedResource) }
|
看起来好像只是做了一个删除资源的处理,其实最后的 syncToClusters 才是大头:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
| func (s *KubeFedSyncController) syncToClusters(fedResource FederatedResource) util.ReconciliationStatus { clusters, err := s.informer.GetClusters() if err != nil { fedResource.RecordError(string(status.ClusterRetrievalFailed), errors.Wrap(err, "Failed to retrieve list of clusters")) return s.setFederatedStatus(fedResource, status.ClusterRetrievalFailed, nil) } selectedClusterNames, err := fedResource.ComputePlacement(clusters) if err != nil { fedResource.RecordError(string(status.ComputePlacementFailed), errors.Wrap(err, "Failed to compute placement")) return s.setFederatedStatus(fedResource, status.ComputePlacementFailed, nil) } kind := fedResource.TargetKind() key := fedResource.TargetName().String() klog.V(4).Infof("Ensuring %s %q in clusters: %s", kind, key, strings.Join(selectedClusterNames.List(), ",")) dispatcher := dispatch.NewManagedDispatcher(s.informer.GetClientForCluster, fedResource, s.skipAdoptingResources) for _, cluster := range clusters { clusterName := cluster.Name selectedCluster := selectedClusterNames.Has(clusterName) if !util.IsClusterReady(&cluster.Status) { if selectedCluster { err := errors.New("Cluster not ready") dispatcher.RecordClusterError(status.ClusterNotReady, clusterName, err) } continue } rawClusterObj, _, err := s.informer.GetTargetStore().GetByKey(clusterName, key) if err != nil { wrappedErr := errors.Wrap(err, "Failed to retrieve cached cluster object") dispatcher.RecordClusterError(status.CachedRetrievalFailed, clusterName, wrappedErr) continue } var clusterObj *unstructured.Unstructured if rawClusterObj != nil { clusterObj = rawClusterObj.(*unstructured.Unstructured) }
if !selectedCluster { if clusterObj == nil { continue } if clusterObj.GetDeletionTimestamp() != nil { dispatcher.RecordStatus(clusterName, status.WaitingForRemoval) continue } if fedResource.IsNamespaceInHostCluster(clusterObj) { dispatcher.RemoveManagedLabel(clusterName, clusterObj) } else { dispatcher.Delete(clusterName) } continue }
if clusterObj == nil { dispatcher.Create(clusterName) } else { dispatcher.Update(clusterName, clusterObj) } } _, timeoutErr := dispatcher.Wait() if timeoutErr != nil { fedResource.RecordError("OperationTimeoutError", timeoutErr) } updatedVersionMap := dispatcher.VersionMap() err = fedResource.UpdateVersions(selectedClusterNames.List(), updatedVersionMap) if err != nil { runtime.HandleError(err) } collectedStatus := dispatcher.CollectedStatus() return s.setFederatedStatus(fedResource, status.AggregateSuccess, &collectedStatus) }
|
通过上面的代码可以看出,这里 dispatcher 去在具体某个集群上做真正的 CRUD 资源操作。个人认为,这里的处理可以改成并发模型处理,如果联邦里集群很多,那么这个for循环一个个找集群做调协可能需要花费很长的时间。
上面的函数最终还需要设置联邦的状态,这个函数主要是设置 conditions 和 联邦资源对象的 map 中的集群字段,最后的返回值就是调协的最终状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| func (s *KubeFedSyncController) setFederatedStatus(fedResource FederatedResource, reason status.AggregateReason,collectedStatus *status.CollectedPropagationStatus) util.ReconciliationStatus { if collectedStatus == nil { collectedStatus = &status.CollectedPropagationStatus{} } kind := fedResource.FederatedKind() name := fedResource.FederatedName() obj := fedResource.Object() if reason == status.AggregateSuccess { if !s.limitedScope && fedResource.NamespaceNotFederated() { reason = status.NamespaceNotFederated } } err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { if updateRequired, err := status.SetFederatedStatus(obj, reason, *collectedStatus); err != nil { return false, errors.Wrapf(err, "failed to set the status") } else if !updateRequired { klog.V(4).Infof("No status update necessary for %s %q", kind, name) return true, nil } err := s.hostClusterClient.UpdateStatus(context.TODO(), obj) if err == nil { return true, nil } if apierrors.IsConflict(err) { klog.V(2).Infof("Failed to set propagation status for %s %q due to conflict (will retry): %v.", kind, name, err) err := s.hostClusterClient.Get(context.TODO(), obj, obj.GetNamespace(), obj.GetName()) if err != nil { return false, errors.Wrapf(err, "failed to retrieve resource") } return false, nil } return false, errors.Wrapf(err, "failed to update resource") }) if err != nil { runtime.HandleError(errors.Wrapf(err, "failed to set propagation status for %s %q", kind, name)) return util.StatusError } return util.StatusAllOK }
|
那么以上就是 Sync 控制器的整体逻辑,这里涉及到的联邦资源状态我们在下一个章节继续探讨。