Istio服务发现-资源事件
通过前面几篇文章,我们已经知道了istio服务注册的一个大体流程,但是对于每个资源在变动的时候是如何进行处理的,比如创建一个k8s原生service,istio在监听到事件后该怎么处理,是否需要对其进行转换然后推送到envoy中?让我们带着这些疑问开始对istio中的资源事件进行一一分析....
通过前面几篇文章,
我们已经知道了istio服务注册的一个大体流程,但是对于每个资源在变动的时候是如何进行处理的,比如创建一个k8s原生service,istio在监听到事件后该怎么处理,是否需要对其进行转换然后推送到envoy中?
让我们带着这些疑问开始对istio中的资源事件进行一一分析.
前序
在开始之前我们先讲解两个控制器,kubeController(在代码中叫kubeRegistry),serviceEntryController.
为什么要讲解这两个控制器?这两个控制器有什么作用那.
首先我们应该知道istio所使用的资源有两种一种是原生k8s资源(pod,service等),另一种是自定义资源(serviceentry,workloadentry等).那么上面两个控制器就是用来管理这两类资源.
kubeController用来管理,pod,namespaces,service,endpoint(k8s),node
serviceEntryController用来管理serviceentry,workloadentry
对于istio其他资源使用的是configStore管理的但是其内容简单这里不再赘述.
K8s原生资源
Pod
首先我们需要了解到istio中工作负载之间的通讯其实是envoy之间的通讯(工作负载数据会发送到envoy中,envoy会根据配置寻找目的IP然后进行转发),POD在创建的时候,内部proxy会注册到istio,istio会将envoy配置发送给proxy,这时候就可以访问其他的工作负载.
具体为,istio将pod转换成workloadInstance,然后调用m.serviceEntryController.WorkloadInstanceHandler
它的作用是将workloadInstance添加到serviceEntryController中的serviceEntryController.workloadInstances缓存中,然后获取serviceEntryController.ServiceEntry来计算选择器是否匹配,如果匹配则进行更新.
这里我们需要注意pod的workloadInstance添加到了serviceEntryController的workload缓存中,并没有添加到处理原生资源的kubeController的workload缓存中,原因是默认开启了serviceEntry标签选择器可以选择POD所以才将workloadInstance添加到该缓存中.而kubeController中并没有使用到POD的.
那么kubeController中的workload缓存又有什么用那?
在istio的设计当中,最初的时候开发者使用的pod作为工作负载,注册到中心,但是这样对于外部服务的管理无能为力,所以开发者们设计了workload资源来替代pod,一个workload代表一个工作负载,当我们引入外部服务的时候可以通过添加workload来实现.上面我们所说的添加到serviceEntryController的缓存中实现的是自定义服务资源访问内部POD资源.那么想要内部服务资源(service)访问外部资源又该如何操作那?kubeController中的workload缓存就实现这个功能,当我们创建workload资源后,会存储到此缓存中并根据该信息构建pod结构体(并不添加到k8s中),通过POD结构体获取原生的k8sServices然后根据标签选择器进行推送更新.
kubeController缓存了整个POD,主要用于endpoint构建时的信息.
func NewEndpointBuilder(c controllerInterface, pod *v1.Pod) *EndpointBuilder {
locality, sa, namespace, hostname, subdomain, ip := "", "", "", "", "", ""
var podLabels labels.Instance
if pod != nil {
// 获取区域信息
locality = c.getPodLocality(pod)
// 获取sa信息
sa = kube.SecureNamingSAN(pod)
// 获取pod标签
podLabels = pod.Labels
namespace = pod.Namespace
subdomain = pod.Spec.Subdomain
if subdomain != "" {
hostname = pod.Spec.Hostname
if hostname == "" {
hostname = pod.Name
}
}
ip = pod.Status.PodIP
}
dm, _ := kubeUtil.GetDeployMetaFromPod(pod)
out := &EndpointBuilder{
controller: c,
serviceAccount: sa,
locality: model.Locality{
Label: locality,
ClusterID: c.Cluster(),
},
tlsMode: kube.PodTLSMode(pod),
workloadName: dm.Name,
namespace: namespace,
hostname: hostname,
subDomain: subdomain,
}
networkID := out.endpointNetwork(ip)
out.labels = labelutil.AugmentLabels(podLabels, c.Cluster(), locality, networkID)
return out
}
具体流程请参考:
// 原生POD注册到serviceEntryController实现serviceEntry选择POD的功能
kubeRegistry.AppendWorkloadHandler(m.serviceEntryController.WorkloadInstanceHandler)
// WorkloadEntry 注册到原生kubeController 实现原生service选择 WorkloadEntry(外部服务)的功能
m.serviceEntryController.AppendWorkloadHandler(kubeRegistry.WorkloadInstanceHandler)
Service
总所周知,当我们创建service后,kube-controller中的endpoint controller会根据其信息创建相应的endpoint,istio就使用这一机制,监听service事件,根据service信息获取所对应的endpoint,然后构建自己的istio-endpoint与model.service然后更新缓存,推送service更新命令.
**在buildEndpointsForService方法中,我们需要注意一下collectWorkloadInstanceEndpoints这个方法,它的作用是根据serivce资源中select标签选择器去serviceEntryController.WorkloadInstance(远程服务缓存)**匹配正确的元素,将这些匹配的元素的endpoint联合之前的原生endpoint一起更新到缓存中,缓存的存储形式为 []*IstioEndpoint.
下面让我们进入代码去一探究竟.
// 这里创建service的informer,注意后面的informer()会创建一个默认带有监听事件的informer
c.serviceInformer = filter.NewFilteredSharedIndexInformer(c.opts.DiscoveryNamespacesFilter.Filter, kubeClient.KubeInformer().Core().V1().Services().Informer())
c.serviceLister = listerv1.NewServiceLister(c.serviceInformer.GetIndexer())
// 这里注册handler
c.registerHandlers(c.serviceInformer, "Services", c.onServiceEvent, nil)
func (c *Controller) onServiceEvent(curr interface{}, event model.Event) error {
// 将当前event转换成corev1.service{}
svc, err := convertToService(curr)
// 转化成istio.service(model.service)
svcConv := kube.ConvertService(*svc, c.opts.DomainSuffix, c.Cluster())
switch event {
case model.EventDelete:
c.deleteService(svcConv)
default:
c.addOrUpdateService(svc, svcConv, event, false)
}
return nil
}
// 这里我们只看添加操作
func (c *Controller) addOrUpdateService(svc *v1.Service, svcConv *model.Service, event model.Event, updateEDSCache bool) {
c.servicesMap[svcConv.Hostname] = svcConv
ns := svcConv.Attributes.Namespace
//这里根据K8sservice获取K8sendpoint然后转换为IstioEndpoint
endpoints := c.buildEndpointsForService(svcConv, updateEDSCache)
if len(endpoints) > 0 {
// 这里进行缓存更新
c.opts.XDSUpdater.EDSCacheUpdate(shard, string(svcConv.Hostname), ns, endpoints)
}
// 主要用于svc删除的时候,删除endpoint,对于添加只进行监控处理
c.opts.XDSUpdater.SvcUpdate(shard, string(svcConv.Hostname), ns, event)
// 主要向上游推送service更新命令
c.handlers.NotifyServiceHandlers(svcConv, event)
}
Endpoint
endpoint会转换成istioendpoint,然后根据hostname获取service,再通过service获取到hostname能够到达的所有endpoint,接着更新缓存(方式与上面service一致)推送service命令.
func updateEDS(c *Controller, epc kubeEndpointsController, ep interface{}, event model.Event) {
namespacedName := epc.getServiceNamespacedName(ep)
log.Debugf("Handle EDS endpoint %s %s %s in namespace %s", namespacedName.Name, event, namespacedName.Namespace)
var forgottenEndpointsByHost map[host.Name][]*model.IstioEndpoint
if event == model.EventDelete {
forgottenEndpointsByHost = epc.forgetEndpoint(ep)
}
shard := model.ShardKeyFromRegistry(c)
// 从name和Namespaced获取hostname
for _, hostName := range c.hostNamesForNamespacedName(namespacedName) {
var endpoints []*model.IstioEndpoint
if forgottenEndpointsByHost != nil {
endpoints = forgottenEndpointsByHost[hostName]
} else {
endpoints = epc.buildIstioEndpoints(ep, hostName)
}
// 通过hostname获取svc
svc := c.GetService(hostName)
if svc != nil {
// 与上面service中的该方法意思一致
// 都是获取匹配的workload
fep := c.collectWorkloadInstanceEndpoints(svc)
endpoints = append(endpoints, fep...)
} else {
log.Debugf("Handle EDS endpoint: skip collecting workload entry endpoints, service %s/%s has not been populated",
namespacedName.Namespace, namespacedName.Name)
}
// 更新eds
c.opts.XDSUpdater.EDSUpdate(shard, string(hostName), namespacedName.Namespace, endpoints)
}
}
Namespace
在mesh.DiscoverySelectors可以选择istio监控的命名空间,那么如果我们动态更改了当前配置,但是新加入的命名空间中的资源并不会触发informer所以我们需要手动进行调用触发每个资源的更新操作.
node
node主要监听带有traffic.istio.io/nodeSelector注释的service,如果没有则不进行监听.
Istio资源
对于istio资源我们只关心两个serviceEntry与workloadEntry,其余常用资源的监听事件是发送资源更新命令,让istiod重新生成配置推送给envoy.
serviceEntry
serviceEntry主要根据workload标签选择器以及address,hostname转换成model.ServiceInstance然后推送命令.
其中主要就是workload选择器,会获取当前workload缓存(这里面含有workload资源转换的还有pod转换成workload的)获取到其endpoint.
workloadEntry
当更新workloadentry资源后主要是更新workload缓存,根据workload标签获取匹配的ServiceEntry和k8s中的service.然后构建ServiceInstances向envoy推送更新命令.
更多推荐
所有评论(0)