Istio-PilotDiscovery服务的创建
info在本文中,将对pilotDiscovery服务创建流程进行源码分析具体代码注释请移至pilot-discovery是istio的注册发现中心,可以说它相当于k8s中的kube-apiserver与协调各个组件,相当于指挥部的存在.那么它具体有什么功能那?让我们来罗列一下(内容可能不全,还请补充)我们可以看到返回值是一个Server结构体,它包含了整个discovery运行过程中所需要的服务
在本文中,将对pilotDiscovery服务创建流程进行源码分析 具体代码注释请移至
https://gitee.com/meng_mengs_boys/istio_1.14.1_test
pilot-discovery是istio的注册发现中心,可以说它相当于k8s中的kube-apiserver与协调各个组件,相当于指挥部的存在.
那么它具体有什么功能那?让我们来罗列一下(内容可能不全,还请补充)
- 动态更新istiod的配置(使用CRD机制,创建文件监听器…,注意这里是istiod的配置不是istio资源的配置)
- 证书认证与管理
- istio资源监听,将istio资源转化为envoy能够识别的配置然后向envoy进行推送.
- 上游服务(POD)的服务注册
- 创建webhook服务器,主要提供自动注入功能
下面是对pilot-discovery的创建方法,这里可以大体有个印象接下来本文会详细讲解里面的方法对应的功能,比如40行 s.initKubeClient(args) 的作用就是创建k8s客户端,根据client.conf为istio中的每个资源创建informer(CRD).
// NewServer creates a new Server instance based on the provided arguments.
func NewServer(args *PilotArgs, initFuncs ...func(*Server)) (*Server, error) {
// 创建一个环境结构体.
//该结构的作用包含了XDS服务器,对当前网格的管理.
e := &model.Environment{
PushContext: model.NewPushContext(),
DomainSuffix: args.RegistryOptions.KubeOptions.DomainSuffix,
}
// 创建注册中心控制器
ac := aggregate.NewController(aggregate.Options{
// 当前网格管理者
MeshHolder: e,
})
e.ServiceDiscovery = ac
s := &Server{
clusterID: getClusterID(args),
environment: e,
fileWatcher: filewatcher.NewWatcher(),
httpMux: http.NewServeMux(),
monitoringMux: http.NewServeMux(),
readinessProbes: make(map[string]readinessProbe),
workloadTrustBundle: tb.NewTrustBundle(nil),
server: server.New(),
shutdownDuration: args.ShutdownDuration,
internalStop: make(chan struct{}),
istiodCertBundleWatcher: keycertbundle.NewWatcher(),
}
// 这里为空
for _, fn := range initFuncs {
fn(s)
}
// 该属性是证书池,存储服务发现中的证书
e.TrustBundle = s.workloadTrustBundle
//创建XDS服务,并初始化它,比如为ConfigGenerator生成器初始化,初始化cache
s.XDSServer = xds.NewDiscoveryServer(e, args.PodName, args.RegistryOptions.KubeOptions.ClusterAliases)
prometheus.EnableHandlingTimeHistogram()
// 初始化kubeclient,并创建所有资源的informer连接。
if err := s.initKubeClient(args); err != nil {
return nil, fmt.Errorf("error initializing kube client: %v", err)
}
// 获取endpoint模式,是否为分片模式
args.RegistryOptions.KubeOptions.EndpointMode = kubecontroller.DetectEndpointMode(s.kubeClient)
//初始化 网格网络配置信息、创建监听事件
/*
meshConfig ConfigMapInformer事件触发后迪奥哟用的handler
1.initMeshHandlers() 向envoy 发送通知
2.environment.Init().NewClusterLocalProvider().onMeshUpdated
更新environment.clusterLocalServices.hosts 本地host值
3.environment.InitNetworksManager(s.XDSServer).NewNetworkManager().NetworkManager.reloadAndPush()
*/
s.initMeshConfiguration(args, s.fileWatcher)
//创建信用域, 是SPIRE需要的配置 ,1.14中的新特性,默认值为cluster.local
spiffe.SetTrustDomain(s.environment.Mesh().GetTrustDomain())
//这里根据参数 networksConfig 重新创建NetworksWatcher
s.initMeshNetworks(args, s.fileWatcher)
//在initMeshConfiguration为mesh配置创建了Infomer,这里就是添加事件函数
s.initMeshHandlers()
//设置只允许本集群访问的host
//本地环境相关内容初始化,这里只做了将本地集群访问可以访问的host存储起来
s.environment.Init()
//初始化network管理器
// 例子:
/*
networks:
network1:
endpoints:
- fromRegistry: registry1 #must match kubeconfig name in Kubernetes secret
- fromCidr: 192.168.100.0/22 #a VM network for example
gateways:
- registryServiceName: istio-ingressgateway.istio-system.svc.cluster.local
port: 15443
locality: us-east-1a
- address: 192.168.100.1
port: 15443
locality: us-east-1a
*/
if err := s.environment.InitNetworksManager(s.XDSServer); err != nil {
return nil, err
}
// Ca配置
caOpts := &caOptions{
TrustDomain: s.environment.Mesh().TrustDomain,
Namespace: args.Namespace,
ExternalCAType: ra.CaExternalType(externalCaType),
CertSignerDomain: features.CertSignerDomain,
}
if caOpts.ExternalCAType == ra.ExtCAK8s {
// Older environment variable preserved for backward compatibility
caOpts.ExternalCASigner = k8sSigner
}
// 创建CA证书管理器,用于证书签发、验证等功能
if err := s.maybeCreateCA(caOpts); err != nil {
return nil, err
}
// 重点
//为路由规则、目的规则等CRD绑定增删改事件,原理采用informer机制。
// 为服务发现注册一个控制器,实现服务发现功能。
if err := s.initControllers(args); err != nil {
return nil, err
}
//初始化Envoy配置生成插件
//以map的形式存储插件
//在发送给envoy时,会将原本的istio结构体转换生成为envoy能够识别的配置
s.XDSServer.InitGenerators(e, args.Namespace)
// Initialize workloadTrustBundle after CA has been initialized
if err := s.initWorkloadTrustBundle(args); err != nil {
return nil, err
}
// 获取当前服务host
istiodHost, _, err := e.GetDiscoveryAddress()
if err != nil {
return nil, err
}
// Create Istiod certs and setup watches.
if err := s.initIstiodCerts(args, string(istiodHost)); err != nil {
return nil, err
}
// 创建有TLS的Grpc服务
if err := s.initSecureDiscoveryService(args); err != nil {
return nil, fmt.Errorf("error initializing secure gRPC Listener: %v", err)
}
var wh *inject.Webhook
// common https server for webhooks (e.g. injection, validation)
if s.kubeClient != nil {
// 这里会创建安全的webhook服务器,以供K8sAdmission访问
s.initSecureWebhookServer(args)
// 添加/inject handler,在原有的pod基础上添加init container属性
//在MutatingAdmissionWebhooks时调用
wh, err = s.initSidecarInjector(args)
if err != nil {
return nil, fmt.Errorf("error initializing sidecar injector: %v", err)
}
//校验配置信息
//在validateadmission时调用
if err := s.initConfigValidation(args); err != nil {
return nil, fmt.Errorf("error initializing config validator: %v", err)
}
}
whc := func() map[string]string {
if wh != nil {
return wh.Config.RawTemplates
}
return map[string]string{}
}
// Used for readiness, monitoring and debug handlers.
if err := s.initIstiodAdminServer(args, whc); err != nil {
return nil, fmt.Errorf("error initializing debug server: %v", err)
}
// 为上面CRD资源注册事件触发回调函数
//作用为,通知监听的服务,向Envoy发送规则配置更改指令
s.initRegistryEventHandlers()
//创建Grpc服务器,主要提供服务注册调用的接口。
//其中的handler由proto直接生成,作用是接受请求,并将请求解析成WorkloadEntry资源,使用k8s客户端进行创建
//创建后会触发上面服务注册控制器
s.initDiscoveryService(args)
s.initSDSServer()
// 权限认证,先使用证书认证,如果成果就不会往下判断
authenticators := []security.Authenticator{
&authenticate.ClientCertAuthenticator{},
}
if args.JwtRule != "" {
jwtAuthn, err := initOIDC(args, s.environment.Mesh().TrustDomain)
if err != nil {
return nil, fmt.Errorf("error initializing OIDC: %v", err)
}
if jwtAuthn == nil {
return nil, fmt.Errorf("JWT authenticator is nil")
}
authenticators = append(authenticators, jwtAuthn)
}
// 这里会添加上jwt认证
authenticators = append(authenticators,
kubeauth.NewKubeJWTAuthenticator(s.environment.Watcher, s.kubeClient, s.clusterID, s.multiclusterController.GetRemoteKubeClient, features.JwtPolicy))
if features.XDSAuth {
s.XDSServer.Authenticators = authenticators
}
caOpts.Authenticators = authenticators
// 启动CA认证服务器
s.startCA(caOpts)
// TODO: don't run this if galley is started, one ctlz is enough
if args.CtrlZOptions != nil {
_, _ = ctrlz.Run(args.CtrlZOptions, nil)
}
// 启动所有资源informer
if s.kubeClient != nil {
s.addStartFunc(func(stop <-chan struct{}) error {
s.kubeClient.RunAndWait(stop)
return nil
})
}
//添加就绪探针
s.addReadinessProbe("discovery", func() (bool, error) {
return s.XDSServer.IsServerReady(), nil
})
return s, nil
}
我们可以看到返回值是一个Server结构体,它包含了整个discovery运行过程中所需要的服务,配置信息,那么就让我们看一下它都有哪些属性!
Server配置
对于discovery中所有包含的服务、配置都存储在pilot\pkg\bootstrap\server.go:Server 结构体中,那么它究竟包含了哪些属性那?让我们一探究竟!
type Server struct {
//创建XDS服务, XDS是与envoy通讯的服务器,istio通过它与envoy进行通讯
XDSServer *xds.DiscoveryServer
// 判断要使用的注册集群,比如使用的MCP模式或者使用k8s
clusterID cluster.ID
//记录了当前集群中的服务个数、规则配置
environment *model.Environment
//kubeClient客户端
kubeClient kubelib.Client
//多集群控制器
multiclusterController *multicluster.Controller
// 配置管理中心,有MCP、K8s管理中心(informer)、内存管理中心
configController model.ConfigStoreController
//配置管理中心数组
ConfigStores []model.ConfigStoreController
//外部服务控制器,用于对serviceEntry信息进行存储,将其转化为instance
serviceEntryController *serviceentry.Controller
httpServer *http.Server // debug, monitoring and readiness Server.
httpsServer *http.Server // webhooks HTTPS Server.
httpsReadyClient *http.Client
// XDSServer其实是使用grpcserver跟envoy进行通讯
grpcServer *grpc.Server
grpcAddress string
secureGrpcServer *grpc.Server
secureGrpcAddress string
// monitoringMux listens on monitoringAddr(:15014).
// Currently runs prometheus monitoring and debug (if enabled).
monitoringMux *http.ServeMux
// httpMux listens on the httpAddr (8080).
// If a Gateway is used in front and https is off it is also multiplexing
// the rest of the features if their port is empty.
// Currently runs readiness and debug (if enabled)
httpMux *http.ServeMux
// httpsMux listens on the httpsAddr(15017), handling webhooks
// If the address os empty, the webhooks will be set on the default httpPort.
httpsMux *http.ServeMux // webhooks
// MultiplexGRPC will serve gRPC and HTTP (1 or 2) over the HTTPListener, if enabled.
MultiplexGRPC bool
// fileWatcher used to watch mesh config, networks and certificates.
// 文件监听器,监听mesh配置与networks,certificates配置
fileWatcher filewatcher.FileWatcher
// certWatcher watches the certificates for changes and triggers a notification to Istiod.
//证书监听器
cacertsWatcher *fsnotify.Watcher
dnsNames []string
//证书管理器
certController *chiron.WebhookController
//CA证书
CA *ca.IstioCA
RA ra.RegistrationAuthority
// TrustAnchors for workload to workload mTLS
workloadTrustBundle *tb.TrustBundle
certMu sync.RWMutex
istiodCert *tls.Certificate
istiodCertBundleWatcher *keycertbundle.Watcher
// 对于server中的组件进行存储起来,在start时进行启动
server server.Instance
readinessProbes map[string]readinessProbe
// duration used for graceful shutdown.
shutdownDuration time.Duration
// internalStop is closed when the server is shutdown. This should be avoided as much as possible, in
// favor of AddStartFunc. This is only required if we *must* start something outside of this process.
// For example, everything depends on mesh config, so we use it there rather than trying to sequence everything
// in AddStartFunc
internalStop chan struct{}
//状态通知
statusReporter *distribution.Reporter
//状态管理
statusManager *status.Manager
// RWConfigStore is the configstore which allows updates, particularly for status.
RWConfigStore model.ConfigStoreController
}
如果我们将Server中的属性按照功能分类的话与我们上面罗列的discovery的功能基本对应,那么本文将按照功能模块对NewServer(也就是pilot-discovery服务的创建)进行分析
Server的创建
既然是返回Server那么首先应该先创建它
// 创建一个环境结构体.
//该结构的作用包含了XDS服务器,对当前网格的管理.
e := &model.Environment{
PushContext: model.NewPushContext(),
DomainSuffix: args.RegistryOptions.KubeOptions.DomainSuffix,
}
e.SetLedger(buildLedger(args.RegistryOptions))
// 创建注册中心控制器
ac := aggregate.NewController(aggregate.Options{
// 当前网格管理者
MeshHolder: e,
})
e.ServiceDiscovery = ac
s := &Server{
clusterID: getClusterID(args),
environment: e,
fileWatcher: filewatcher.NewWatcher(),
httpMux: http.NewServeMux(),
monitoringMux: http.NewServeMux(),
readinessProbes: make(map[string]readinessProbe),
workloadTrustBundle: tb.NewTrustBundle(nil),
server: server.New(),
shutdownDuration: args.ShutdownDuration,
internalStop: make(chan struct{}),
istiodCertBundleWatcher: keycertbundle.NewWatcher(),
}
// 这里为空,传值为nil
for _, fn := range initFuncs {
fn(s)
}
// 该属性是证书池,存储服务发现中的证书,好像提供mTLS功能
e.TrustBundle = s.workloadTrustBundle
Environment的创建
我们从3-6行可以看到创建了一个Environment结构体,为两个属性赋了值,这个时候疑问就出来了,Environment结构体有什么用,这里面的两个属性为什么要这么赋值?那么接下来让我们一一讲解!
type Environment struct {
// Discovery interface for listing services and instances.
// istio会将ServiceEntry和workloadEntry解析成ServiceInstance结构体然后进行存储到注册表中
// 而IstioService管理器主要是管理注册表,比如获取当前集群中的所有服务等等
// 默认为
ServiceDiscovery
// 管理istio资源配置,比如获取istio中的资源配置信息,默认使用的CRDInformer获取k8s中的资源
ConfigStore
// Watcher is the watcher for the mesh config (to be merged into the config store)
// 这是对mesh配置的监听,mesh配置默认在cm中的istio配置中
mesh.Watcher
//对mesh配置中的Network进行监听
NetworksWatcher mesh.NetworksWatcher
//对mesh配置中的Network进行管理
NetworkManager *NetworkManager
//一个上下文信息,记录了当前集群中的所有配置信息,比如virtualService,destinationRule等
//为什么会记录所有那?因为istio是一个全量推送(当然有一些优化但是这些基本的元素还是全量)
//所以会记录所有的信息,在每个推送会话过程中会使用到
PushContext *PushContext
// istio服务的后缀,默认为cluster.local
DomainSuffix string
ledger ledger.Ledger
// TrustBundle: List of Mesh TrustAnchors
// 与mesh配置相关的TrustAnchors
TrustBundle *trustbundle.TrustBundle
// 本地集群中的一些服务信息,做一些比如namespaces为kube-system的服务,不能动的操作等
clusterLocalServices ClusterLocalProvider
//网关api管理器
GatewayAPIController GatewayController
}
e.SetLedger(buildLedger(args.RegistryOptions))
// 创建注册中心控制器
ac := aggregate.NewController(aggregate.Options{
// 当前网格管理者
MeshHolder: e,
})
e.ServiceDiscovery = ac
对于注册中心控制器,我们不需要太多关注,它主要是将每个注册中心进行整合起来\
PushContext
这里重点讲解一下PushContext,因为当配置发生配置之后,istio会将其进行envoy配置的生成.而envoy配置的生成很有可能是全部生成,这时候就需要用到所有的资源,如果每操作一次都获取全部的资源是非常麻烦的,所以istio创建了一个PushContext,该上下文包含了所有资源的信息,当调用xdsservice.configUpdate()方法后(下篇start时会讲到)会根据全局的PushContext(也就是当前的这个)拷贝一个信息的PushContext进行接下来的操作.
Mesh创建与监听
Server创建完成后,继续往下走我们可以看到好几个对mesh,Networks的初始化等方法,他们所提供的功能是监听mesh配置,根据mesh配置的变动进行推送。
// 获取endpoint模式,是否为分片模式
args.RegistryOptions.KubeOptions.EndpointMode = kubecontroller.DetectEndpointMode(s.kubeClient)
//初始化 网格网络配置信息、创建监听事件
/*
meshConfig ConfigMapInformer事件触发后迪奥哟用的handler
1.initMeshHandlers() 向envoy 发送通知
2.environment.Init().NewClusterLocalProvider().onMeshUpdated
更新environment.clusterLocalServices.hosts 本地host值
3.environment.InitNetworksManager(s.XDSServer).NewNetworkManager().NetworkManager.reloadAndPush()
*/
s.initMeshConfiguration(args, s.fileWatcher)
//创建信用域, 是SPIRE需要的配置 ,1.14中的新特性,默认值为cluster.local
spiffe.SetTrustDomain(s.environment.Mesh().GetTrustDomain())
//这里根据参数 networksConfig 重新创建NetworksWatcher
s.initMeshNetworks(args, s.fileWatcher)
//在initMeshConfiguration为mesh配置创建了Infomer,这里就是添加事件函数
s.initMeshHandlers()
//设置只允许本集群访问的host
//本地环境相关内容初始化,这里只做了将本地集群访问可以访问的host存储起来
s.environment.Init()
//初始化network管理器
// 例子:
/*
networks:
network1:
endpoints:
- fromRegistry: registry1 #must match kubeconfig name in Kubernetes secret
- fromCidr: 192.168.100.0/22 #a VM network for example
gateways:
- registryServiceName: istio-ingressgateway.istio-system.svc.cluster.local
port: 15443
locality: us-east-1a
- address: 192.168.100.1
port: 15443
locality: us-east-1a
*/
if err := s.environment.InitNetworksManager(s.XDSServer); err != nil {
return nil, err
}
可能大家对这块有点糊涂(反正我是这样)其实这块很好理解,它的基本流程就是,通过file或者Informer对mesh配置进行监听,如果有变动,那就推送给Envoy.
证书认证
对于证书这块代码有些长,所以请细心查看
// 该属性是证书池,存储服务发现中的证书
e.TrustBundle = s.workloadTrustBundle
//创建信用域, 是SPIRE需要的配置 ,1.14中的新特性,默认值为cluster.local
spiffe.SetTrustDomain(s.environment.Mesh().GetTrustDomain())
// Ca配置
caOpts := &caOptions{
TrustDomain: s.environment.Mesh().TrustDomain,
Namespace: args.Namespace,
ExternalCAType: ra.CaExternalType(externalCaType),
CertSignerDomain: features.CertSignerDomain,
}
if caOpts.ExternalCAType == ra.ExtCAK8s {
// Older environment variable preserved for backward compatibility
caOpts.ExternalCASigner = k8sSigner
}
// 创建CA证书管理器,用于证书签发、验证等功能
if err := s.maybeCreateCA(caOpts); err != nil {
return nil, err
}
// Initialize workloadTrustBundle after CA has been initialized
if err := s.initWorkloadTrustBundle(args); err != nil {
return nil, err
}
// Create Istiod certs and setup watches.
// 这里生成istiod的证书,主要用于webhook与安全Grpc
if err := s.initIstiodCerts(args, string(istiodHost)); err != nil {
return nil, err
}
// 创建有TLS的Grpc服务
if err := s.initSecureDiscoveryService(args); err != nil {
return nil, fmt.Errorf("error initializing secure gRPC Listener: %v", err)
}
//Secret discovery service,envoy通过SDS向pilot-agent发送证书和密钥请求
//在收到 SDS 请求后,istio-agent 创建私钥和 CSR,然后将 CSR 及其凭据发送到 istiod CA 进行签名。
//istiod CA 验证 CSR 中携带的凭据,成功验证后签署 CSR 以生成证书。
//Istio-agent 通过 Envoy SDS API 将私钥和从 Istio CA 收到的证书发送给 Envoy。
//Istio-agent 会监工作负载证书的有效期。上述 CSR 过程会周期性地重复,以处理证书和密钥轮换。
s.initSDSServer()
authenticators := []security.Authenticator{
&authenticate.ClientCertAuthenticator{},
}
if args.JwtRule != "" {
jwtAuthn, err := initOIDC(args, s.environment.Mesh().TrustDomain)
if err != nil {
return nil, fmt.Errorf("error initializing OIDC: %v", err)
}
if jwtAuthn == nil {
return nil, fmt.Errorf("JWT authenticator is nil")
}
authenticators = append(authenticators, jwtAuthn)
}
// 这里会添加上jwt认证
authenticators = append(authenticators,
kubeauth.NewKubeJWTAuthenticator(s.environment.Watcher, s.kubeClient, s.clusterID, s.multiclusterController.GetRemoteKubeClient, features.JwtPolicy))
if features.XDSAuth {
s.XDSServer.Authenticators = authenticators
}
caOpts.Authenticators = authenticators
// 启动CA认证服务器,向grpc注册创建ca证书handler
s.startCA(caOpts)
// TODO: don't run this if galley is started, one ctlz is enough
if args.CtrlZOptions != nil {
_, _ = ctrlz.Run(args.CtrlZOptions, nil)
}
对于证书的签证与管理有两步
- 管理每个工作负载,使用插入CA证书,管理员可以在运行参数中静态提供CA证书的地址,如果没有提供的话默认istiod会创建一个自签名的根证书和密钥,并使用它们来签署工作负载证书。
- 管理DNS证书,istio使用 Chiron 配置和管理 DNS 证书,Chiron 是一个与 Istiod 相连的轻量型组件,使用 Kubernetes 的 CA API 签发证书.
DiscoveryServer
接下里就开始本文的重点DiscoveryServer的创建,该结构体定义了对istio中资源的操作反馈,可以说它完成了istio-Discovery最核心的功能
创建XDS服务, XDS是与envoy通讯的服务器,istio通过它与envoy进行通讯
//创建XDS服务
s.XDSServer = xds.NewDiscoveryServer(e, args.PodName, args.RegistryOptions.KubeOptions.ClusterAliases)
// 重点
//为路由规则、目的规则等CRD绑定增删改事件,原理采用informer机制。
// 为服务发现注册一个控制器,实现服务发现功能。
if err := s.initControllers(args); err != nil {
return nil, err
}
//初始化Envoy配置生成插件
//以map的形式存储插件
//在发送给envoy时,会将原本的istio结构体转换生成为envoy能够识别的配置
s.XDSServer.InitGenerators(e, args.Namespace)
// 为上面CRD资源注册事件触发回调函数
//作用为,通知监听的服务,向Envoy发送规则配置更改指令
s.initRegistryEventHandlers()
//创建Grpc服务器,主要提供服务注册调用的接口。
//其中的handler由proto直接生成,作用是接受请求
//创建后会触发上面服务注册控制器
s.initDiscoveryService(args)
结构
既然它这么重要,就让我们看看他的结构吧!
// DiscoveryServer is Pilot's gRPC implementation for Envoy's xds APIs
type DiscoveryServer struct {
// 与server中的env一致
Env *model.Environment
// MemRegistry is used for debug and load testing, allow adding services. Visible for testing.
// 创建debug,测试使用的服务
MemRegistry *memory.ServiceDiscovery
// 配置生成器
ConfigGenerator core.ConfigGenerator
// 为每个Envoy配置添加对应的配置生成器,有的配置会使用到上面的配置生成器,而有的不会使用过
Generators map[string]model.XdsResourceGenerator
// ProxyNeedsPush is a function that determines whether a push can be completely skipped. Individual generators
// may also choose to not send any updates.
ProxyNeedsPush func(proxy *model.Proxy, req *model.PushRequest) bool
// concurrentPushLimit is a semaphore that limits the amount of concurrent XDS pushes.
concurrentPushLimit chan struct{}
// requestRateLimit limits the number of new XDS requests allowed. This helps prevent thundering hurd of incoming requests.
requestRateLimit *rate.Limiter
// InboundUpdates describes the number of configuration updates the discovery server has received
InboundUpdates *atomic.Int64
// CommittedUpdates describes the number of configuration updates the discovery server has
// received, process, and stored in the push context. If this number is less than InboundUpdates,
// there are updates we have not yet processed.
// Note: This does not mean that all proxies have received these configurations; it is strictly
// the push context, which means that the next push to a proxy will receive this configuration.
CommittedUpdates *atomic.Int64
// IstioEndpoint分片存储器,根据服务名与命名空间存储
EndpointIndex *model.EndpointIndex
// pushChannel is the buffer used for debouncing.
// after debouncing the pushRequest will be sent to pushQueue
pushChannel chan *model.PushRequest
// mutex used for protecting Environment.PushContext
updateMutex sync.RWMutex
// pushQueue is the buffer that used after debounce and before the real xds push.
pushQueue *PushQueue
// debugHandlers is the list of all the supported debug handlers.
debugHandlers map[string]string
// adsClients reflect active gRPC channels, for both ADS and EDS.
// 存储了istio与每个envoy创建的conn
adsClients map[string]*Connection
adsClientsMutex sync.RWMutex
StatusReporter DistributionStatusCache
// Authenticators for XDS requests. Should be same/subset of the CA authenticators.
// 请求身份验证集合
Authenticators []security.Authenticator
// StatusGen is notified of connect/disconnect/nack on all connections
StatusGen *StatusGen
// WorkloadEntry控制器,作用是对每个envoy进行健康检测
WorkloadEntryController *workloadentry.Controller
// serverReady indicates caches have been synced up and server is ready to process requests.
// 标识缓存已经同步完成
serverReady atomic.Bool
// 去抖策略
debounceOptions debounceOptions
// 当前istiod所对应的podName
instanceID string
// 缓存XDS资源
Cache model.XdsCache
// jwt解析器
JwtKeyResolver *model.JwksResolver
// ListRemoteClusters collects debug information about other clusters this istiod reads from.
ListRemoteClusters func() []cluster.DebugInfo
// ClusterAliases are aliase names for cluster. When a proxy connects with a cluster ID
// and if it has a different alias we should use that a cluster ID for proxy.
// 集群别名
ClusterAliases map[cluster.ID]cluster.ID
}
上面有些属性没有进行翻译,因为都是些通俗易懂的配置,如果不了解请查阅istio启动配置参数进行了解.
initControllers
它的主要功能有两个
- 根据注册中心创建资源监听事件,比如使用k8s注册中心,那么为每个资源创建informer监听事件
- 为serviceEntry,workloadEntry创建控制器主要存储存储了服务注册时创建的serviceEntry,workloadEntry信息
在该方法中进行了注册中心的判断,如果我们使用默认的配置话使用的是k8s作为注册中心.
// initControllers initializes the controllers.
func (s *Server) initControllers(args *PilotArgs) error {
log.Info("initializing controllers")
// 集群原生资源管理
s.initMulticluster(args)
// Certificate controller is created before MCP controller in case MCP server pod
// waits to mount a certificate to be provisioned by the certificate controller.
// 将mesh配置中的Certificate,生成密钥对并获取由 K8s_CA 签名的公共证书
// 选项用于签署 DNS 证书,然后创建k8s secret资源
if err := s.initCertController(args); err != nil {
return fmt.Errorf("error initializing certificate controller: %v", err)
}
//为istio中的CRD资源创建informer并绑定事件函数,比如虚拟路由规则、目标地址规则等
if err := s.initConfigController(args); err != nil {
return fmt.Errorf("error initializing config controller: %v", err)
}
//创建服务注册控制器,里面绑定了Serivceentry和workloadEntry资源的事件函数
if err := s.initServiceControllers(args); err != nil {
return fmt.Errorf("error initializing service controllers: %v", err)
}
return nil
}
Controller结构体
老规矩先看一下Controller结构体
// Controller communicates with ServiceEntry CRDs and monitors for changes.
type Controller struct {
// 向XDS客户端,主要向envoy推送信息
XdsUpdater model.XDSUpdater
// 获取资源客户端,比如client-go
store model.ConfigStore
//集群ID k8s
clusterID cluster.ID
// This lock is to make multi ops on the below stores. For example, in some case,
// it requires delete all instances and then update new ones.
// 资源锁
mutex sync.RWMutex
// 存储serviceInstances,由ServiceEntry转换
serviceInstances serviceInstancesStore
// NOTE: historically, one index for both WorkloadEntry(s) and Pod(s);
// beware of naming collisions
// 存储workloadInstances由workloadEntry转换
workloadInstances workloadinstances.Index
//所有Service
services serviceStore
// to make sure the eds update run in serial to prevent stale ones can override new ones
// There are multiple threads calling edsUpdate.
// If all share one lock, then all the threads can have an obvious performance downgrade.
// 将所有更新操作串行化
edsQueue queue.Instance
workloadHandlers []func(*model.WorkloadInstance, model.Event)
// 用于根据工作负载 ip 和标签获取网络 ID 的回调函数。
networkIDCallback func(IP string, labels labels.Instance) network.ID
processServiceEntry bool
model.NetworkGatewaysHandler
}
initMulticluster
初始化多集群(英文直译),监听每个集群中的资源,pod,namespaces,endpoint,service,根据这些资源的事件来更新envoy中的endpoint
具体的更新流程为
- 创建一个POD,触发更新事件
- 根据POD的命名空间与名称查找endpoints中是否有信息
- 如果找到获取endpoints信息,将其转换为IstioEndpoints 存入缓存中并推送给envoy
- 如果没有找到则结束.
看到这里我们应该对它的功能有所熟悉了,它就是将原生的service也添加到内部网络中,这就是istio兼容k8s原生服务的原理
接下来让我们看一下它的创建过程
func (s *Server) initMulticluster(args *PilotArgs) {
if s.kubeClient == nil {
return
}
// 创建secert informer,主要监听每个集群的客户端证书
s.multiclusterController = multicluster.NewController(s.kubeClient, args.Namespace, s.clusterID)
//根据secert证书获取集群同步状态
s.XDSServer.ListRemoteClusters = s.multiclusterController.ListRemoteClusters
// 启动每个集群
s.addStartFunc(func(stop <-chan struct{}) error {
return s.multiclusterController.Run(stop)
})
}
- 第6行,其实是创建了secert的informer监听,主要监听每个集群的客户端证书,然后会根据secert的添加(相当于集群的注册)调用方法,将该集群的原生资源解析(比如说Service资源那么就转换成istioService并推送给上游服务)
- 第8行主要判断当前集群组同步状态,当client.RunAndWait(client-go里的方法,判断当前informer等缓存同步)成功后完成集群同步状态
- 第10-12行是开启本地集群,并运行远程集群注册informer,本地集群的运行在下篇启动时讲解
initConfigController
对于initConfigController的详细介绍请移至https://blog.csdn.net/a1023934860/article/details/125787691?spm=1001.2014.3001.5502
initServiceControllers
它的功能是注册服务发现控制器,其主要功能是存储由serviceEntry,workloadEntry转换成的ServiceInstance.
其主要的功能为为serviceEntry,workloadEntry注册了监听触发事件
// NewController creates a new ServiceEntry discovery service.
func NewController(configController model.ConfigStoreController, store model.ConfigStore, xdsUpdater model.XDSUpdater,
options ...Option) *Controller {
//创建控制器,里面存储了pod注册时创建的serviceEntry,workloadEntry信息
s := newController(store, xdsUpdater, options...)
if configController != nil {
//为serviceEntry Informer添加handler方法
configController.RegisterEventHandler(gvk.ServiceEntry, s.serviceEntryHandler)
// 同理为workloadEntry informer添加handler方法
configController.RegisterEventHandler(gvk.WorkloadEntry, s.workloadEntryHandler)
_ = configController.SetWatchErrorHandler(informermetric.ErrorHandlerForCluster(s.clusterID))
}
return s
}
这两个handler会在下一篇Start讲解
WebHook服务创建与自动注入
我们在使用的过程中一般都会自动让其注入代理,那么就让我们看看webhook的注入吧.
var wh *inject.Webhook
// common https server for webhooks (e.g. injection, validation)
if s.kubeClient != nil {
// 这里会创建安全的webhook服务器,以供K8sAdmission访问
s.initSecureWebhookServer(args)
// 添加/inject handler,在原有的pod基础上添加init container属性
//在MutatingAdmissionWebhooks时调用
wh, err = s.initSidecarInjector(args)
if err != nil {
return nil, fmt.Errorf("error initializing sidecar injector: %v", err)
}
//校验配置信息
//在validateadmission时调用
if err := s.initConfigValidation(args); err != nil {
return nil, fmt.Errorf("error initializing config validator: %v", err)
}
}
首先第5行创建了一个基于istio证书认证的http2.0服务器.下面第8行才是重点让我们直接进入initSidecarInjector一探究竟
initSidecarInjector
func (s *Server) initSidecarInjector(args *PilotArgs) (*inject.Webhook, error) {
// currently the constant: "./var/lib/istio/inject"\
// 根据inject路径获取inject配置文件
injectPath := args.InjectionOptions.InjectionDirectory
if injectPath == "" || !injectionEnabled.Get() {
log.Infof("Skipping sidecar injector, injection path is missing or disabled.")
return nil, nil
}
// If the injection config exists either locally or remotely, we will set up injection.
// 这里会判断如果本地没有则会去k8s中查询Cm
var watcher inject.Watcher
if _, err := os.Stat(filepath.Join(injectPath, "config")); !os.IsNotExist(err) {
configFile := filepath.Join(injectPath, "config")
valuesFile := filepath.Join(injectPath, "values")
watcher, err = inject.NewFileWatcher(configFile, valuesFile)
if err != nil {
return nil, err
}
} else if s.kubeClient != nil {
configMapName := getInjectorConfigMapName(args.Revision)
cms := s.kubeClient.CoreV1().ConfigMaps(args.Namespace)
if _, err := cms.Get(context.TODO(), configMapName, metav1.GetOptions{}); err != nil {
if errors.IsNotFound(err) {
log.Infof("Skipping sidecar injector, template not found")
return nil, nil
}
return nil, err
}
watcher = inject.NewConfigMapWatcher(s.kubeClient, args.Namespace, configMapName, "config", "values")
} else {
log.Infof("Skipping sidecar injector, template not found")
return nil, nil
}
log.Info("initializing sidecar injector")
parameters := inject.WebhookParameters{
Watcher: watcher,
Env: s.environment,
Mux: s.httpsMux,
Revision: args.Revision,
}
// 添加webhook handler
// 添加inject方法
wh, err := inject.NewWebhook(parameters)
}
//进入NewWebhook
// NewWebhook creates a new instance of a mutating webhook for automatic sidecar injection.
func NewWebhook(p WebhookParameters) (*Webhook, error) {
if p.Mux == nil {
return nil, errors.New("expected mux to be passed, but was not passed")
}
wh := &Webhook{
watcher: p.Watcher,
meshConfig: p.Env.Mesh(),
env: p.Env,
revision: p.Revision,
}
// 添加自动注入方法,我们在创建资源后apiserver会根据webhook配置调用该handler对POD资源进行填充
p.Mux.HandleFunc("/inject", wh.serveInject)
p.Mux.HandleFunc("/inject/", wh.serveInject)
p.Env.Watcher.AddMeshHandler(func() {
wh.mu.Lock()
wh.meshConfig = p.Env.Mesh()
wh.mu.Unlock()
})
return wh, nil
}
对于serveInject方法,主要对POD添加了init,istio-proxy等信息.
还有一个方法没有讲到第171行s.initDiscoveryService(args),该方法是创建一个Grpc服务器,主要供上游注册时访问,该服务器注册了服务注册的方法,具体流程请移至下篇文章Istio-PilotDiscovery服务的启动
总结
下面是一些需要注意的地方
- 它里面有五个控制器ServiceDiscovery,configStroe,XDSService,serviceEntryController,WorkloadEntryController
这五个控制器基本上完成了大部分功能让我们看一下他们具体的功能
ServiceDiscovery,服务管理器目的是整合所有的注册控制器,向外提供统一的接口,比如获取上游服务,它会遍历所有的注册中心然后将其整合在一起返回
configStroe,配置管理中心,对于使用k8s注册中心来说,它就是一个client-go客户端,对每个资源创建informe并使用informer缓存所有的资源信息,供系统调用
XDSService,主要保存了每个上游服务的conn,身份验证集合,推送管道,接受管道,envoy配置生成器,可以理解成它保存了istio与envoy通讯之间的一系列操作.
serviceEntryController,k8s服务注册器,主要存储当前集群的上游服务,并根据上游服务的命名空间等过滤条件进行存储
WorkloadEntryController,上游服务管理器,主要对上游服务进行健康检查等操作.
在这几个控制器中,都有缓存的影子存在,但是每个缓存的作用不太一样
在XDSService中有EndpointIndex(根据serviceName与namespaces存储Istioendpoint),作用是判断当前更新的istioendpoint是否需要进行全量更新,如果不是则进行增量更新.
ServiceDiscovery主要存储了serviceInstance与workloadInstance结构,主要记录了当前注册中心中的服务.
configStroe缓存,如果默认使用的是k8s注册中心,则它的缓存就是informer(CRD)中的缓存机制
Handler
istio的代码有些绕,主要表现在handler,它会时不时add一下,所以最后都不知道handler具体调用了哪些.
接下来让我们罗列一下在修改资源时调用了哪些handler,这些handler都有哪些功能
更多推荐
所有评论(0)