源码基于k8s 1.9 release

源码目录结构

cmd/kube-controller-manager/app/core.go   // service Controller的启动代码,包含很多其它controller的启动

/pkg/controller/service
.
├── BUILD
├── doc.go
├── service_controller.go // service的核心代码逻辑
├── serrvice_controller_test.go    
├── OWNERS

首先看service的启动函数,startServiceController函数返回ServiceController结构体,主要使用到以下几类资源,cloud、service、node:
startServiceController
进入New函数,这部分就是在初始化serviceController,eventBroadcaster.NewRecorder记录”service-controller”的事件,cache中保存的是经过操作的service,主要是为了提供信息给删除操作。
new
AddEventHandlerWithResyncPeriod包装成listener,定义对象处理的回调函数,关注service资源的增删改操作,这里增删直接进入队列,而改操作经过needUpdate函数判定是否将更改后的service以namespace/name的字符串形式加入队列
fun
needsUpdate函数的判定逻辑如下(设定更改前的service为旧service,更改后的service为新service):
需要入队的情况,由上往下符合即入队返回:
1、旧service的.spec.type为LoadBalancer但新service不是;
2、新service的.spec.type为LoadBalancer且新旧service的loadBalancerSourceRanges不等;
3、新旧service的lb端口或.spec. sessionAffinity不等;
4、新旧service的.spec. loadBalancerIP不等;
5、新旧service的.spec. externalIPs的后端接受流量的节点数不一致(包括长度和节点ip);
6、新旧service的注解不一致,即. metadata. Annotations值不一致;
7、新旧service的. metadata.uid值不等;
8、新旧service的分流策略不一致,即.spec.externalTrafficPolicy不等(Local和Cluster模式);
9、新旧service的健康检测端口不一致,即.spec. healthCheckNodePort不等(只作用于.spec.type为LoadBalancer并且.spec.externalTrafficPolicy为Local模式);
不需要入队的情况:
1、不符合上述入队的情况;
2、旧service和新service的.spec.type均不为LoadBalancer;

所有初始化操作完成后,将进入init函数,需要提醒的是,如果集群未设置cloud-provider或设置不当等相关参数,service将返回如下的错误,而service controller并不会进入到实际的工作,最后提示Failed to start service controller: WARNING: no cloud provider provided, services of type LoadBalancer will fail或 Failed to start service controller: the cloud provider does not support external load balancers。
init
一般对于内部使用而言,我们一般不需要关注cloud provider,而kube-controller-manager的service controller到这里也将停止工作。但作为研究,还是准备将service controller的逻辑分析完善。
在成功New serviceController之后,将进入Run函数,这里面包含了serviceContrller的主要业务逻辑,这里我们主要看wait.Until(s.worker, time.Second, stopCh)的worker函数
run
进入到syncService函数,具体代码如下
syncService
将从queue中获取到的key,经过SplitMetaNamespaceKey函数得到service的namespace和name,然后调用serviceLister的方法获取service。进行以下逻辑:
1、如果该service已经不存在了,表明该service已经被删除,则将进行service的收尾工作,通过serviceContrller的cache变量获取该service,如果不存在,则返回;否则就调用processLoadBalancerDelete函数继续判断该service不是loadBalancerIP类型,则直接返回,否则调用serviceContrller中的balancer变量的EnsureLoadBalancerDeleted方法进行balance的删除,如果删除错误,返回错误和下次延迟重试时间,serviceContrller会根据这个延迟重试时间将该service加入到queue等待下一次的处理。所有操作无误后清理cache变量的service缓存,并将重试时间设置为0,这时表示该操作已经正确完成。
2、如果是其他未知错误,则将该key继续加入queue,等待下一次处理。
除了上述两种情况,剩下的就是update或add操作了,通过cache变量获取cachedService数据,进而通过processServiceUpdate函数,具体代码如下
processServiceUpdate
3.1、如果cachedService的state不为空,如果缓存的service的uid与获取的service的uid不等,不一致就调用processLoadBalancerDelete函数处理缓存的service,对其收尾工作。出错返回延迟重试时间,并将key重新插入queue。
3.2、更新cache中的service,通过createLoadBalancerIfNeeded判断是否为service生成loadbalance,其中createLoadBalancerIfNeeded的代码逻辑如下
createLoadBalancerIfNeeded
判断service的.spec.type是否为LoadBalancer,如果type不是LoadBalancer则判断loadbalance是否已经存在,如果存在就将该loadbalance删除。如果type是LoadBalancer将为该service生成loadbalance,最后再对比previousState和newState,如果两者不一样,则更新service的status。在逻辑判断的过程中,如果出错,都会将返回重试时间,并将该key再次写入queue等待再次处理该key。

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐