from pprint import pprint
from kubernetes import client, config, utils
import kubernetes
import yaml
from typing import Dict
import urllib
import os
from kubernetes.client.rest import ApiException
import subprocess


def read_yaml(file_path: str) -> Dict:
    req = urllib.request.urlopen(file_path)
    cfg = yaml.safe_load(req.read())
    return cfg


def get_secret(serviceaccount):
    command = f"cat {serviceaccount}/token"
    p = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
    out, err = p.communicate()
    return out.decode("utf-8")


def create_inference(resource, secret, cacert, apiserver, namespace):
    configuration = kubernetes.client.Configuration()
    # Configure API key authorization: BearerToken
    configuration.api_key['authorization'] = secret
    # Uncomment below to setup prefix (e.g. Bearer) for API key, if needed
    configuration.api_key_prefix['authorization'] = 'Bearer'
    configuration.ssl_ca_cert = cacert
    # Defining host is optional and default to http://localhost
    configuration.host = apiserver
    with kubernetes.client.ApiClient(configuration) as api_client:
        # Create an instance of the API class
        api_instance = kubernetes.client.CustomObjectsApi(api_client)
        
        api_instance.create_namespaced_custom_object(
            group="serving.kserve.io",
            version="v1beta1",
            namespace=namespace,
            plural="inferenceservices",
            body=resource,
        )
        print("Resource created")


if __name__ == "__main__":
    APISERVER = "https://kubernetes.default.svc"
    SERVICEACCOUNT = "/var/run/secrets/kubernetes.io/serviceaccount"
    CACERT = f"{SERVICEACCOUNT}/ca.crt"
    namespace = "wangzy"
    print("数据处理模块运行开始...")
    os.chdir(os.path.split(os.path.realpath(__file__))[0])
    print("当前运行目录: ",os.getcwd())
    # https://kubernetes.io/docs/tasks/run-application/access-api-from-pod/
    inference_yaml = "http://minio.storage.svc.cluster.local/wangzy/operator/mind/model/inference.yaml"
    my_resource = read_yaml(inference_yaml)
    secret = get_secret(SERVICEACCOUNT)
    create_inference(my_resource, secret, CACERT, APISERVER, namespace)

其中:"http://minio.storage.svc.cluster.local/wangzy/operator/mind/model/inference.yaml"为自定义的yaml文件,内容如下:

apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
  name: "mind-transformer-test"
  namespace: "wangzy"
  annotations:
    sidecar.istio.io/inject: "false"
spec:
  transformer:
    containers:
    - image: registry.cn-hangzhou.aliyuncs.com/rory/kubeflow1:mind_transformer_v0.0.1
      name: transformer-container
      command:
      - "python"
      - "-m"
      - "src.service.algorithms.mind.transformer"
      - --data_process_yaml
      - "http://minio.storage.svc.cluster.local/wangzy/operator/mind/config/data_process.yaml"
      - --model_train_yaml
      - "http://minio.storage.svc.cluster.local/wangzy/operator/mind/config/model_train.yaml"
      - --namespace
      - "wangzy-p"
  predictor:
    tensorflow:
       storageUri: "http://minio.storage.svc.cluster.local/wangzy/operator/mind/model/20220902.zip" 

参考链接

  • https://kubernetes.io/docs/tasks/run-application/access-api-from-pod/
  • https://github.com/kubernetes-client/python/blob/master/examples/namespaced_custom_object.py
  • https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/CustomObjectsApi.md#create_cluster_custom_object
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐