client-go源码解析(一):client-go客户端对象
client-go源码解析(一):client-go客户端对象client-go的重要性client-go是对K8s集群的二次开发工具,所以client-go是k8s开发者的必备工具之一。本篇目的本篇的目的旨在教会大家k8s集群的客户端有哪些种类,大家可能之前都使用过client-go去操作k8s中的资源对象,但是对于其中原理可能一知半解,client-go github那边只是给出了简单的几个例
client-go源码解析(一):client-go客户端对象
client-go的重要性
client-go是对K8s集群的二次开发工具,所以client-go是k8s开发者的必备工具之一。
本篇目的
本篇的目的旨在教会大家k8s集群的客户端有哪些种类,大家可能之前都使用过client-go去操作k8s中的资源对象,但是对于其中原理可能一知半解,client-go github那边只是给出了简单的几个例子,没有太多深入讲解。本讲就先跟大家介绍下,client-go到底有几种客户端对象去和集群交互,这些客户端对象各自有什么用途,他们交互的原理是什么?
客户端对象
client-go的客户端对象有4个,作用各有不同:
- RESTClient: 是对HTTP Request进行了封装,实现了RESTful风格的API。其他客户端都是在RESTClient基础上的实现。可与用于k8s内置资源和CRD资源
- ClientSet:是对k8s内置资源对象的客户端的集合,默认情况下,不能操作CRD资源,但是通过client-gen代码生成的话,也是可以操作CRD资源的。
- DynamicClient:不仅能对K8S内置资源进行处理,还可以对CRD资源进行处理,不需要client-gen生成代码即可实现。
- DiscoveryClient:用于发现kube-apiserver所支持的资源组、资源版本、资源信息(即Group、Version、Resources)。
这小节总结下:RESTCLient、ClientSet和DynamicClient都可以对K8S内置资源和CRD资源进行操作。只是clientSet需要生成代码才能操作CRD资源。
RESTClient
RESTClient是对HTTP Request 进行了封装,是实现了RESTful风格的API封装,使得使用者在获取K8S资源时不需要使用原始API的方式(例如请求https://1.142.113.45:6443/api/v1/pods这样的原始形式)。 ClientSet的客户端使得代码更有阅读性,更易使用。下面举个例子:
func GetRestClient(kubeconfig string) (error, *rest.RESTClient){
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return err, nil
}
config.APIPath = "api"
config.GroupVersion = &corev1.SchemeGroupVersion
config.NegotiatedSerializer = scheme.Codecs
restClient, err := rest.RESTClientFor(config)
if err != nil {
return err, nil
}
return nil, restClient
}
这边是先给了kubeconfig文件,然后clientcmd.BuildConfigFromFlags("", kubeconfig)加载配置文件并生成config对象,看下ClientConfig对象的生成:
func BuildConfigFromFlags(masterUrl, kubeconfigPath string) (*restclient.Config, error) {
if kubeconfigPath == "" && masterUrl == "" {
klog.Warningf("Neither --kubeconfig nor --master was specified. Using the inClusterConfig. This might not work.")
kubeconfig, err := restclient.InClusterConfig()
if err == nil {
return kubeconfig, nil
}
klog.Warning("error creating inClusterConfig, falling back to default config: ", err)
}
return NewNonInteractiveDeferredLoadingClientConfig(
&ClientConfigLoadingRules{ExplicitPath: kubeconfigPath},
&ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: masterUrl}}).ClientConfig()
}
这个ClientConfig()函数负责将Kuebconfig文件加载后的对象NewNonInteractiveDeferredLoadingClientConfig转换成restclient.Config对象:
func (config *DirectClientConfig) ClientConfig() (*restclient.Config, error) {
// check that getAuthInfo, getContext, and getCluster do not return an error.
// Do this before checking if the current config is usable in the event that an
// AuthInfo, Context, or Cluster config with user-defined names are not found.
// This provides a user with the immediate cause for error if one is found
configAuthInfo, err := config.getAuthInfo()
if err != nil {
return nil, err
}
_, err = config.getContext()
if err != nil {
return nil, err
}
configClusterInfo, err := config.getCluster()
if err != nil {
return nil, err
}
if err := config.ConfirmUsable(); err != nil {
return nil, err
}
clientConfig := &restclient.Config{}
clientConfig.Host = configClusterInfo.Server
if len(config.overrides.Timeout) > 0 {
timeout, err := ParseTimeout(config.overrides.Timeout)
if err != nil {
return nil, err
}
clientConfig.Timeout = timeout
}
if u, err := url.ParseRequestURI(clientConfig.Host); err == nil && u.Opaque == "" && len(u.Path) > 1 {
u.RawQuery = ""
u.Fragment = ""
clientConfig.Host = u.String()
}
if len(configAuthInfo.Impersonate) > 0 {
clientConfig.Impersonate = restclient.ImpersonationConfig{
UserName: configAuthInfo.Impersonate,
Groups: configAuthInfo.ImpersonateGroups,
Extra: configAuthInfo.ImpersonateUserExtra,
}
}
// only try to read the auth information if we are secure
if restclient.IsConfigTransportTLS(*clientConfig) {
var err error
var persister restclient.AuthProviderConfigPersister
if config.configAccess != nil {
authInfoName, _ := config.getAuthInfoName()
persister = PersisterForUser(config.configAccess, authInfoName)
}
userAuthPartialConfig, err := config.getUserIdentificationPartialConfig(configAuthInfo, config.fallbackReader, persister)
if err != nil {
return nil, err
}
mergo.MergeWithOverwrite(clientConfig, userAuthPartialConfig)
serverAuthPartialConfig, err := getServerIdentificationPartialConfig(configAuthInfo, configClusterInfo)
if err != nil {
return nil, err
}
mergo.MergeWithOverwrite(clientConfig, serverAuthPartialConfig)
}
return clientConfig, nil
}
clientConfig就是最后生成的config对象。这边使用给出的kubeconfig文件的路径来初始化一个clientset。当然,client-go可以使用in-cluster config的方式来初始化这些客户端。只不过一般我们调试的时候使用kubeconfig文件来调试。
那么我们来看下RESTClient客户端的实现以此来深入理解:
先看RESTClientFor函数的实现:
func RESTClientFor(config *Config) (*RESTClient, error) {
// 基础配置检验下,没有就直接报错退出
if config.GroupVersion == nil {
return nil, fmt.Errorf("GroupVersion is required when initializing a RESTClient")
}
if config.NegotiatedSerializer == nil {
return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
}
// 有些配置没给配置,就默认配置下。
qps := config.QPS
if config.QPS == 0.0 {
qps = DefaultQPS
}
burst := config.Burst
if config.Burst == 0 {
burst = DefaultBurst
}
baseURL, versionedAPIPath, err := defaultServerUrlFor(config) // 获取基本的URL和apiPath,这个行程了基础的url,一会跟一下代码
if err != nil {
return nil, err
}
transport, err := TransportFor(config)
if err != nil {
return nil, err
}
var httpClient *http.Client
if transport != http.DefaultTransport {
httpClient = &http.Client{Transport: transport}
if config.Timeout > 0 {//是否设置超时时间,没有就给默认值
httpClient.Timeout = config.Timeout
}
}
return NewRESTClient(baseURL, versionedAPIPath, config.ContentConfig, qps, burst, config.RateLimiter, httpClient) // 新建的NewRESTClient
}
先看下defaultServerUrlFor函数的代码:
func defaultServerUrlFor(config *Config) (*url.URL, string, error) {
// TODO: move the default to secure when the apiserver supports TLS by default
// config.Insecure is taken to mean "I want HTTPS but don't bother checking the certs against a CA."
hasCA := len(config.CAFile) != 0 || len(config.CAData) != 0
hasCert := len(config.CertFile) != 0 || len(config.CertData) != 0
defaultTLS := hasCA || hasCert || config.Insecure
host := config.Host // 拿出hosts
if host == "" {
host = "localhost"
}
if config.GroupVersion != nil { // groupversion
return DefaultServerURL(host, config.APIPath, *config.GroupVersion, defaultTLS)
}
return DefaultServerURL(host, config.APIPath, schema.GroupVersion{}, defaultTLS)
}
看下NewRESTClient函数里面有什么:
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConfig, maxQPS float32, maxBurst int, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) {
base := *baseURL
if !strings.HasSuffix(base.Path, "/") {
base.Path += "/"
}
base.RawQuery = ""
base.Fragment = ""
if config.GroupVersion == nil {
config.GroupVersion = &schema.GroupVersion{}
}
if len(config.ContentType) == 0 {
config.ContentType = "application/json"
}
serializers, err := createSerializers(config)
if err != nil {
return nil, err
}
var throttle flowcontrol.RateLimiter
if maxQPS > 0 && rateLimiter == nil {
throttle = flowcontrol.NewTokenBucketRateLimiter(maxQPS, maxBurst)
} else if rateLimiter != nil {
throttle = rateLimiter
}
return &RESTClient{
base: &base,
versionedAPIPath: versionedAPIPath,
contentConfig: config,
serializers: *serializers,
createBackoffMgr: readExpBackoffConfig,
Throttle: throttle,
Client: client,
}, nil
}
看了写这个函数只是做了基本的封装。下面讲下如何使用RESTClient操作资源对象:
restClient.Get().Namespace("default").Resource("pods").Do()
Get()函数的返回,是个request对象:
func (c *RESTClient) Get() *Request {
return c.Verb("GET")
}
Namespace函数,只是将namespace设置进request字段:
func (r *Request) Namespace(namespace string) *Request {
if r.err != nil {
return r
}
if r.namespaceSet {
r.err = fmt.Errorf("namespace already set to %q, cannot change to %q", r.namespace, namespace)
return r
}
if msgs := IsValidPathSegmentName(namespace); len(msgs) != 0 {
r.err = fmt.Errorf("invalid namespace %q: %v", namespace, msgs)
return r
}
r.namespaceSet = true
r.namespace = namespace // 在request对象中设置namespace
return r
}
Resource函数:
func (r *Request) Resource(resource string) *Request {
if r.err != nil {
return r
}
if len(r.resource) != 0 {
r.err = fmt.Errorf("resource already set to %q, cannot change to %q", r.resource, resource)
return r
}
if msgs := IsValidPathSegmentName(resource); len(msgs) != 0 {
r.err = fmt.Errorf("invalid resource %q: %v", resource, msgs)
return r
}
r.resource = resource // 在request对象中设置resource
return r
}
Do函数:
func (r *Request) Do() Result {
r.tryThrottle()
var result Result
err := r.request(func(req *http.Request, resp *http.Response) {
result = r.transformResponse(resp, req)
})
if err != nil {
return Result{err: err}
}
return result
}
着重看下里面的request函数:
func (r *Request) request(fn func(*http.Request, *http.Response)) error {
//Metrics for total request latency
start := time.Now()
defer func() {
metrics.RequestLatency.Observe(r.verb, r.finalURLTemplate(), time.Since(start))
}()
if r.err != nil {
klog.V(4).Infof("Error in request: %v", r.err)
return r.err
}
// TODO: added to catch programmer errors (invoking operations with an object with an empty namespace)
// 校验参数是否合理
if (r.verb == "GET" || r.verb == "PUT" || r.verb == "DELETE") && r.namespaceSet && len(r.resourceName) > 0 && len(r.namespace) == 0 {
return fmt.Errorf("an empty namespace may not be set when a resource name is provided")
}
if (r.verb == "POST") && r.namespaceSet && len(r.namespace) == 0 {
return fmt.Errorf("an empty namespace may not be set during creation")
}
client := r.client
if client == nil {
client = http.DefaultClient
}
// Right now we make about ten retry attempts if we get a Retry-After response.
maxRetries := 10
retries := 0
for {
url := r.URL().String()// 需要着重看下这个URL函数
req, err := http.NewRequest(r.verb, url, r.body)
if err != nil {
return err
}
if r.timeout > 0 {
if r.ctx == nil {
r.ctx = context.Background()
}
var cancelFn context.CancelFunc
r.ctx, cancelFn = context.WithTimeout(r.ctx, r.timeout)
defer cancelFn()
}
if r.ctx != nil {
req = req.WithContext(r.ctx)
}
req.Header = r.headers
r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
if retries > 0 {
// We are retrying the request that we already send to apiserver
// at least once before.
// This request should also be throttled with the client-internal throttler.
r.tryThrottle()
}
resp, err := client.Do(req)
updateURLMetrics(r, resp, err)
if err != nil {
r.backoffMgr.UpdateBackoff(r.URL(), err, 0)
} else {
r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode)
}
if err != nil {
// "Connection reset by peer" is usually a transient error.
// Thus in case of "GET" operations, we simply retry it.
// We are not automatically retrying "write" operations, as
// they are not idempotent.
if !net.IsConnectionReset(err) || r.verb != "GET" {
return err
}
// For the purpose of retry, we set the artificial "retry-after" response.
// TODO: Should we clean the original response if it exists?
resp = &http.Response{
StatusCode: http.StatusInternalServerError,
Header: http.Header{"Retry-After": []string{"1"}},
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
}
}
done := func() bool {
// Ensure the response body is fully read and closed
// before we reconnect, so that we reuse the same TCP
// connection.
defer func() {
const maxBodySlurpSize = 2 << 10
if resp.ContentLength <= maxBodySlurpSize {
io.Copy(ioutil.Discard, &io.LimitedReader{R: resp.Body, N: maxBodySlurpSize})
}
resp.Body.Close()
}()
retries++
if seconds, wait := checkWait(resp); wait && retries < maxRetries {
if seeker, ok := r.body.(io.Seeker); ok && r.body != nil {
_, err := seeker.Seek(0, 0)
if err != nil {
klog.V(4).Infof("Could not retry request, can't Seek() back to beginning of body for %T", r.body)
fn(req, resp)
return true
}
}
klog.V(4).Infof("Got a Retry-After %ds response for attempt %d to %v", seconds, retries, url)
r.backoffMgr.Sleep(time.Duration(seconds) * time.Second)
return false
}
fn(req, resp)
return true
}()
if done {
return nil
}
}
}
着重看下r.URL函数,它是生成URL用的,这边你就会发现之前在request中设置的resource,namepspace之类的都用在这边拼接url上了:
func (r *Request) URL() *url.URL {
p := r.pathPrefix
if r.namespaceSet && len(r.namespace) > 0 {
p = path.Join(p, "namespaces", r.namespace)
}
if len(r.resource) != 0 {
p = path.Join(p, strings.ToLower(r.resource))
}
// Join trims trailing slashes, so preserve r.pathPrefix's trailing slash for backwards compatibility if nothing was changed
if len(r.resourceName) != 0 || len(r.subpath) != 0 || len(r.subresource) != 0 {
p = path.Join(p, r.resourceName, r.subresource, r.subpath)
}
finalURL := &url.URL{}
if r.baseURL != nil {
*finalURL = *r.baseURL
}
finalURL.Path = p
query := url.Values{}
for key, values := range r.params {
for _, value := range values {
query.Add(key, value)
}
}
// timeout is handled specially here.
if r.timeout != 0 {
query.Set("timeout", r.timeout.String())
}
finalURL.RawQuery = query.Encode()
return finalURL
}
总结下这个小节:api-server暴露了很多接口,而RESTClient就会对这些api的封装,所以我们看到源码中有生成对应的URL之类的。
ClientSet客户端
ClientSet客户端默认是对k8s内置资源对象客户端的集合,通过ClientSet客户端可以操作k8s的内置资源对象。下面会介绍为什么clientset是内置资源对象客户端的集合的。
来段代码做引入:
func getClientSet(kubeconfig string) {
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
panic(err.Error())
}
// create the clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
for {
pods, err := clientset.CoreV1().Pods("").List(metav1.ListOptions{})
if err != nil {
panic(err.Error())
}
fmt.Printf("There are %d pods in the cluster\n", len(pods.Items))
// Examples for error handling:
// - Use helper functions like e.g. errors.IsNotFound()
// - And/or cast to StatusError and use its properties like e.g. ErrStatus.Message
namespace := "default"
pod := "example-xxxxx"
_, err = clientset.CoreV1().Pods(namespace).Get(pod, metav1.GetOptions{})
if errors.IsNotFound(err) {
fmt.Printf("Pod %s in namespace %s not found\n", pod, namespace)
} else if statusError, isStatus := err.(*errors.StatusError); isStatus {
fmt.Printf("Error getting pod %s in namespace %s: %v\n",
pod, namespace, statusError.ErrStatus.Message)
} else if err != nil {
panic(err.Error())
} else {
fmt.Printf("Found pod %s in namespace %s\n", pod, namespace)
}
time.Sleep(10 * time.Second)
}
}
clientcmd.BuildConfigFromFlags函数上面已经分析过了,这边就分析一下不一样的地方,生成clientset的函数kubernetes.NewForConfig,这边我们可以看到cs这是了很多k8s的内置资源客户端对象,还有设置了一个discoveryClient:
func NewForConfig(c *rest.Config) (*Clientset, error) {
configShallowCopy := *c
if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 {
configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst)
}
var cs Clientset
var err error
// 这边开始会初始化k8s内置的资源的clientset
cs.admissionregistrationV1beta1, err = admissionregistrationv1beta1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.appsV1, err = appsv1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.appsV1beta1, err = appsv1beta1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.appsV1beta2, err = appsv1beta2.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.auditregistrationV1alpha1, err = auditregistrationv1alpha1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.authenticationV1, err = authenticationv1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.authenticationV1beta1, err = authenticationv1beta1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.authorizationV1, err = authorizationv1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.authorizationV1beta1, err = authorizationv1beta1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.autoscalingV1, err = autoscalingv1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.autoscalingV2beta1, err = autoscalingv2beta1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.autoscalingV2beta2, err = autoscalingv2beta2.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.batchV1, err = batchv1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.batchV1beta1, err = batchv1beta1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.batchV2alpha1, err = batchv2alpha1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.certificatesV1beta1, err = certificatesv1beta1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.coordinationV1beta1, err = coordinationv1beta1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.coordinationV1, err = coordinationv1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.coreV1, err = corev1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.eventsV1beta1, err = eventsv1beta1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.extensionsV1beta1, err = extensionsv1beta1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.networkingV1, err = networkingv1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.networkingV1beta1, err = networkingv1beta1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.nodeV1alpha1, err = nodev1alpha1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.nodeV1beta1, err = nodev1beta1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.policyV1beta1, err = policyv1beta1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.rbacV1, err = rbacv1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.rbacV1beta1, err = rbacv1beta1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.rbacV1alpha1, err = rbacv1alpha1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.schedulingV1alpha1, err = schedulingv1alpha1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.schedulingV1beta1, err = schedulingv1beta1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.schedulingV1, err = schedulingv1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.settingsV1alpha1, err = settingsv1alpha1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.storageV1beta1, err = storagev1beta1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.storageV1, err = storagev1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.storageV1alpha1, err = storagev1alpha1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(&configShallowCopy) // 注意啦,clientset里面还放了一个DiscoverClient
if err != nil {
return nil, err
}
return &cs, nil
}
然后追进NewForConfig看下,看到rest.RESTClientFor就知道是啥了吧,这函数里面就是RESTClient的实现了:
func NewForConfig(c *rest.Config) (*AppsV1beta1Client, error) {
config := *c
if err := setConfigDefaults(&config); err != nil {
return nil, err
}
client, err := rest.RESTClientFor(&config)
if err != nil {
return nil, err
}
return &AppsV1beta1Client{client}, nil
}
现在理解了,为啥clientset客户端是每个k8s内置资源客户端对象的集合了吧。
DynamicClient
DynamicClient的特点就是除了可以使用k8s内置资源外,还可以使用CRD资源。dynamicClient的原理就是传入的资源数据都是使用map[string]interface{}结构。
func getDynamicClientExample(kubeconfig string) {
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
panic(err)
}
client, err := dynamic.NewForConfig(config)
if err != nil {
panic(err)
}
deploymentRes := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}
deployment := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": map[string]interface{}{
"name": "demo-deployment",
},
"spec": map[string]interface{}{
"replicas": 2,
"selector": map[string]interface{}{
"matchLabels": map[string]interface{}{
"app": "demo",
},
},
"template": map[string]interface{}{
"metadata": map[string]interface{}{
"labels": map[string]interface{}{
"app": "demo",
},
},
"spec": map[string]interface{}{
"containers": []map[string]interface{}{
{
"name": "web",
"image": "nginx:1.12",
"ports": []map[string]interface{}{
{
"name": "http",
"protocol": "TCP",
"containerPort": 80,
},
},
},
},
},
},
},
},
}
// Create Deployment
fmt.Println("Creating deployment...")
result, err := client.Resource(deploymentRes).Namespace("").Create(deployment, metav1.CreateOptions{})
if err != nil {
panic(err)
}
fmt.Printf("Created deployment %q.\n", result.GetName())
}
直接从dynamic.NewForConfig(config)开始:
func NewForConfig(inConfig *rest.Config) (Interface, error) {
config := rest.CopyConfig(inConfig)
// for serializing the options
config.GroupVersion = &schema.GroupVersion{}
config.APIPath = "/if-you-see-this-search-for-the-break"
config.AcceptContentTypes = "application/json"
config.ContentType = "application/json"
config.NegotiatedSerializer = basicNegotiatedSerializer{} // this gets used for discovery and error handling types
if config.UserAgent == "" {
config.UserAgent = rest.DefaultKubernetesUserAgent()
}
restClient, err := rest.RESTClientFor(config) //还是去新建了一个RESTClient客户端
if err != nil {
return nil, err
}
return &dynamicClient{client: restClient}, nil
}
从代码中看dynamicClient内部还是restClient.
DiscoveryClient
DiscoveryClient是发现客户端,主要用于发现k8s api-server所支持的资源组、资源版本及资源信息。
先来个damo:
func getDisCoveryClient(kubeconfig string) {
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
panic(err)
}
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
panic(err)
}
_, APIResourceList, err := discoveryClient.ServerGroupsAndResources()\
if err != nil {
panic(err)
}
for _, list := range APIResourceList {
}
}
直接看这个函数discovery.NewDiscoveryClientForConfig(config)吧,下面也可以看到就是使用了RESTClient:
func NewDiscoveryClientForConfig(c *restclient.Config) (*DiscoveryClient, error) {
config := *c
// 设置一些默认的参数
if err := setDiscoveryDefaults(&config); err != nil {
return nil, err
}
// 类似RESTClient客户端
client, err := restclient.UnversionedRESTClientFor(&config)
return &DiscoveryClient{restClient: client, LegacyPrefix: "/api"}, err
}
疑惑点
问:4种客户端其实都使用了restClient这个基础的客户端,但是有必要分的这么清吗?4种客户端有同时存在的必要吗?
答:我之前感觉只要有RESTClient就够了,至少没必要同时存在clientSet和DynamicClient,因为clientset客户端通过代码生成也能操作CRD资源。后来仔细想想,对于未知对象的获取,dynamicClient客户端就有用了,也就是说,clientset客户端只针对你知道是什么类型的对象是有用的,对于你不知道的类型,你只能时使用dynamicClient。
总结
client-go客户端是客户端访问k8s资源的一种RESTFul API封装,使得用户与k8s交互更方便,代码更容易理解。当然,client-go提供给用户的远不止这些。本节主要介绍了4中客户端形式,比较常用的还是能够直接操作k8s内置资源的clientset。client-go的github仓库上有官方提供的例子(https://github.com/kubernetes/client-go/blob/master/examples/in-cluster-client-configuration/main.go)
之后本公众号(“云原生手记”)还将继续推出client-go源码解析系列文章,敬请关注!
更多推荐
所有评论(0)