kubernetes scheduler 浅析
什么是kubernetes scheduler?
小到运行着几十个工作负载的 kubernetes 集群,大到运行成千上万个工作负载 kubernetes 集群,每个工作负载到底应该在哪里运行,这需要一个聪明的大脑进行指挥,kubernetes scheduler 就是这个聪明的大脑。从结果看,他的工作很简单,只是为 pod.spec.nodeName 填充上一个 node 的名字而已,从过程看,他又是极其复杂的,因为到底要选哪个节点才最合理,答案往往是和场景强相关的,几乎找不到一套适应各种场景的调度算法。因此,各式各样的算法插件也层出不穷,公司对调度算法的定制化开发也成了常见需求。
调度器如何运行的?
一个调度器主要是由这样两个大循环构成
源码传送门
第一阶段:PreScore
func (pl *SelectorSpread) PreScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status {if skipSelectorSpread(pod) {return nil}var selector labels.Selectorselector = helper.DefaultSelector(pod,pl.services,pl.replicationControllers,pl.replicaSets,pl.statefulSets,)state := &preScoreState{selector: selector,}cycleState.Write(preScoreStateKey, state)return nil}复制代码
func DefaultSelector(pod *v1.Pod, sl corelisters.ServiceLister, cl corelisters.ReplicationControllerLister, rsl appslisters.ReplicaSetLister, ssl appslisters.StatefulSetLister) labels.Selector {labelSet := make(labels.Set)// Since services, RCs, RSs and SSs match the pod, they won't have conflicting// labels. Merging is safe.if services, err := GetPodServices(sl, pod); err == nil {for _, service := range services {labelSet = labels.Merge(labelSet, service.Spec.Selector)}}if rcs, err := cl.GetPodControllers(pod); err == nil {for _, rc := range rcs {labelSet = labels.Merge(labelSet, rc.Spec.Selector)}}selector := labels.NewSelector()if len(labelSet) != 0 {selector = labelSet.AsSelector()}if rss, err := rsl.GetPodReplicaSets(pod); err == nil {for _, rs := range rss {if other, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector); err == nil {if r, ok := other.Requirements(); ok {selector = selector.Add(r...)}}}}if sss, err := ssl.GetPodStatefulSets(pod); err == nil {for _, ss := range sss {if other, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector); err == nil {if r, ok := other.Requirements(); ok {selector = selector.Add(r...)}}}}return selector}复制代码
在方法签名里的 cycleState 作为调度不同阶段传递数据的一个仓库,这一阶段做的事情并不多,就是把所有包含 pod label 的控制器找出来,并保存到 cycleState,供下一阶段使用。需要注意的是 pod 的控制器必须是 Services、ReplicaSets 和 StatefulSets,replicationControllers 其中之一。
第二阶段:Score
func (pl *SelectorSpread) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {if skipSelectorSpread(pod) {return 0, nil}c, err := state.Read(preScoreStateKey)if err != nil {return 0, framework.NewStatus(framework.Error, fmt.Sprintf("Error reading %q from cycleState: %v", preScoreStateKey, err))}s, ok := c.(*preScoreState)if !ok {return 0, framework.NewStatus(framework.Error, fmt.Sprintf("%+v convert to tainttoleration.preScoreState error", c))}nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName)if err != nil {return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))}count := countMatchingPods(pod.Namespace, s.selector, nodeInfo)return int64(count), nil}复制代码
func skipSelectorSpread(pod *v1.Pod) bool {return len(pod.Spec.TopologySpreadConstraints) != 0}复制代码
func countMatchingPods(namespace string, selector labels.Selector, nodeInfo *framework.NodeInfo) int {if len(nodeInfo.Pods) == 0 || selector.Empty() {return 0}count := 0for _, p := range nodeInfo.Pods {// Ignore pods being deleted for spreading purposes// Similar to how it is done for SelectorSpreadPriorityif namespace == p.Pod.Namespace && p.Pod.DeletionTimestamp == nil {if selector.Matches(labels.Set(p.Pod.Labels)) {count++}}}return count}复制代码
- 首先判断了 pod 是否设置了拓扑约束,如果设置了,直接就得0分。
- 如果没设置,就交给一个叫做 countMatchingPods 的函数来计算分数。计算方法就是看在一个 node 上,同命名空间下,并且不是处于删除状态的 pod,与当前要调度的 Pod 的 label 相同的有多少个,1个=1分,累积起来为当前节点的分。
第三阶段:NormalizeScore
这一阶段的作用为对 Score 阶段的得分做一个修正,修正后的结果才会作为节点的最终得分。具体代码如下。
func (pl *SelectorSpread) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {if skipSelectorSpread(pod) {return nil}countsByZone := make(map[string]int64, 10)maxCountByZone := int64(0)maxCountByNodeName := int64(0)for i := range scores {if scores[i].Score > maxCountByNodeName {maxCountByNodeName = scores[i].Score}nodeInfo, err := pl.sharedLister.NodeInfos().Get(scores[i].Name)if err != nil {return framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", scores[i].Name, err))}zoneID := utilnode.GetZoneKey(nodeInfo.Node())if zoneID == "" {continue}countsByZone[zoneID] += scores[i].Score}for zoneID := range countsByZone {if countsByZone[zoneID] > maxCountByZone {maxCountByZone = countsByZone[zoneID]}}haveZones := len(countsByZone) != 0maxCountByNodeNameFloat64 := float64(maxCountByNodeName)maxCountByZoneFloat64 := float64(maxCountByZone)MaxNodeScoreFloat64 := float64(framework.MaxNodeScore)for i := range scores {// initializing to the default/max node score of maxPriorityfScore := MaxNodeScoreFloat64if maxCountByNodeName > 0 {fScore = MaxNodeScoreFloat64 * (float64(maxCountByNodeName-scores[i].Score) / maxCountByNodeNameFloat64)}// If there is zone information present, incorporate itif haveZones {nodeInfo, err := pl.sharedLister.NodeInfos().Get(scores[i].Name)if err != nil {return framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", scores[i].Name, err))}zoneID := utilnode.GetZoneKey(nodeInfo.Node())if zoneID != "" {zoneScore := MaxNodeScoreFloat64if maxCountByZone > 0 {zoneScore = MaxNodeScoreFloat64 * (float64(maxCountByZone-countsByZone[zoneID]) / maxCountByZoneFloat64)}fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore)}}scores[i].Score = int64(fScore)}return nil}复制代码
- 首先还是判断了 pod 是否设置了拓扑约束,如果设置了,得分为0,这一点和上一阶段完全相同。
- 然后找出得分最高的节点和 zone
- 节点的最终得分为100 *(最大节点得分-当前节点得分)/ 最大节点得分,所以当前节点得分越高,最终得分就越低,这是符合尽量分散的预期的。
- 如果是 zone 的情况
- 当前 zone 的得分 = 100*(最大 zone 的得分-当前 zone 的得分)/最大 zone 的得分
- 最终节点维度得分 = 上一步的节点得分 * (1- zone 维度的权重)+ zone 维度的得分* zone 维度的权重
什么是zone?
这个概念在调度的过程中是为了适配某几个节点视为一个整体的情况,比如 nodeA 和 nodeB 同属于一个 zone, 那么在调度算法中,会在一定程度是同时考虑这两个节点是否存在相同副本。那么一个 node 到底属于哪个 zone 呢,我在源码中找到了这样一段代码。
func GetZoneKey(node *v1.Node) string {labels := node.Labelsif labels == nil {return ""}// TODO: prefer stable labels for zone in v1.18zone, ok := labels[v1.LabelZoneFailureDomain]if !ok {zone, _ = labels[v1.LabelZoneFailureDomainStable]}// TODO: prefer stable labels for region in v1.18region, ok := labels[v1.LabelZoneRegion]if !ok {region, _ = labels[v1.LabelZoneRegionStable]}if region == "" && zone == "" {return ""}// We include the null character just in case region or failureDomain has a colon// (We do assume there's no null characters in a region or failureDomain)// As a nice side-benefit, the null character is not printed by fmt.Print or glogreturn region + ":\x00:" + zone}复制代码
这说明两个 node 是否是在同一个 zone,取决于它的 label。
资源碎片问题怎么解决?
举个例子来说明一下这个问题,如同下图这样,我有两个节点分别剩余1GPU,但是当我想要创建一个 2GPU 的 pod 出来,却找不到能调度的节点。
实现方式
和调度器自身解决同一控制器下的 pod 尽量分散的方式一样,我们这个算法也是对 node 的打分进行干预。因此,可以很容易想到,把我们的算法插入进调度器就好了。那么这项工作该怎么进行呢?下面将介绍业内主流的方案。
扩展调度器的标准流程:Scheduling Framework
调度器扩展的方式可以总结为一下四种
- 直接改 kube-scheduler 的源码
- 额外部署完全独立的调度器
- 调度器扩展程序
- Scheduling Framework 前面三种已经不推荐使用了,因此这里只介绍最后一种。 还记得我们在文章开头的那两个控制循环吧,扩展的突破口,就在第二个循环内部。它可以放大成下面这张图。
www.qikqiak.com/post/custom…
- github.com/kubernetes-…
- time.geekbang.org/column/arti…
推荐阅读
Guava Cache实战—从场景使用到原理分析
详解 HTTP2.0 及 HTTPS 协议
初识 JVM(带你从不同的视角认识 JVM)
招贤纳士
政采云技术团队(Zero),一个富有激情、创造力和执行力的团队,Base 在风景如画的杭州。团队现有300多名研发小伙伴,既有来自阿里、华为、网易的“老”兵,也有来自浙大、中科大、杭电等校的新人。团队在日常业务开发之外,还分别在云原生、区块链、人工智能、低代码平台、中间件、大数据、物料体系、工程平台、性能体验、可视化等领域进行技术探索和实践,推动并落地了一系列的内部技术产品,持续探索技术的新边界。此外,团队还纷纷投身社区建设,目前已经是 google flutter、scikit-learn、Apache Dubbo、Apache Rocketmq、Apache Pulsar、CNCF Dapr、Apache DolphinScheduler、alibaba Seata 等众多优秀开源社区的贡献者。如果你想改变一直被事折腾,希望开始折腾事;如果你想改变一直被告诫需要多些想法,却无从破局;如果你想改变你有能力去做成那个结果,却不需要你;如果你想改变你想做成的事需要一个团队去支撑,但没你带人的位置;如果你想改变本来悟性不错,但总是有那一层窗户纸的模糊……如果你相信相信的力量,相信平凡人能成就非凡事,相信能遇到更好的自己。如果你希望参与到随着业务腾飞的过程,亲手推动一个有着深入的业务理解、完善的技术体系、技术创造价值、影响力外溢的技术团队的成长过程,我觉得我们该聊聊。任何时间,等着你写点什么,发给 zcy-tc@cai-inc.com