class ArgoRollout():
    def __init__(self, namespace, app_name, image_name,  port=8080):
        self.k8s_api = client.ApiClient()
        self.app_name = app_name
        self.namespace = namespace
        self.image_name = image_name
        self.port = port
        self.template_yaml_path = "k8s-rollout-template.yml"


    def _init_body(self,
        """根据模版初始化yaml"""
        with open(self.template_yaml_path) as f:
            depoy_tmp = yaml.safe_load(f)
        # 转化为字符串,方便进行关键字替换(可优化)
        deploy_tmp_str = json.dumps(depoy_tmp)
        deploy_ins_str = deploy_tmp_str.replace('__argo_rollout_name', self.app_name).replace('__namespace_name',
                                                                                              self.namespace).replace(
            '__image_name', self.image_name)
        deploy_ins = json.loads(deploy_ins_str)
        # 副本数和端口必须是int型
        deploy_ins['spec']['template']['spec']['containers'][0]['ports'][0]['containerPort'] = self.port
        deploy_ins['spec']['template']['spec']['containers'][0]['livenessProbe']['httpGet']['port'] = self.port
        deploy_ins['spec']['template']['spec']['containers'][0]['readinessProbe']['httpGet']['port'] = self.port
        deploy_ins['spec']['template']['spec']['containers'][0]['startupProbe']['httpGet']['port'] = self.port
        return deploy_ins

    def create_argo_rollouts(self):
        """创建 argo rollouts"""
        body = self._init_body()
        url = "/apis/argoproj.io/v1alpha1/namespaces/{namespace}/rollouts".format(namespace=self.namespace)
        resp = self.k8s_api.call_api(url, "POST", body=body, _preload_content=False)
        rollout_info = resp[0].data.decode("utf-8")
        rollout_info = json.loads(rollout_info)
        return rollout_info

    def get_argo_rollout_info(self):
        url = "/apis/argoproj.io/v1alpha1/namespaces/{namespace}/rollouts/{app_name}".format(namespace=self.namespace,
                                                                                             app_name=self.app_name)
        resp = self.k8s_api.call_api(url, "GET", _preload_content=False)
        rollout_info = resp[0].data.decode("utf-8")
        rollout_info = json.loads(rollout_info)
        return rollout_info

    def list_rollouts(self):
        """获取 argo rollouts 列表"""
        url = "/apis/argoproj.io/v1alpha1/namespaces/{namespace}/rollouts/".format(namespace=self.namespace)
        rollout = []
        try:
            resp = self.k8s_api.call_api(url, "GET", _preload_content=False)
            rollout_by_ns = resp[0].data.decode("utf-8")
            rollout_by_ns = json.loads(rollout_by_ns)
            for deploy in rollout_by_ns["items"]:
                rollout.append(deploy["metadata"]["name"])
        except Exception as e:
            print(e)
        finally:
            return rollout

    def patch_rollouts(self, batch: int=None, release_type="auto", batch_wait_time=60, gray_percentage=1):
        """更新 argo rollouts"""
        url = "/apis/argoproj.io/v1alpha1/namespaces/{namespace}/rollouts/{name}".format(namespace=self.namespace,
                                                                                         name=self.app_name)
        body = self.get_argo_rollout_info()
        # 根据副本个数进行分批发布
        replicas = body["spec"]["replicas"]
        if batch is None:
            if replicas == 0:
                raise Exception("replicas cannot be 0")
            elif replicas == 1:
                raise Exception("replicas is 1, Unable to perform grayscale publishing")
            elif replicas == 2:
                batch = 1
            elif 2 <= replicas < 6:
                batch = 2
            elif 6 <= replicas < 20:
                batch = 3
            elif 20 <= replicas < 40:
                batch = 5
            elif 40 <= replicas < 60:
                batch = 6
            elif 60 <= replicas < 80:
                batch = 7
            elif 80 <= replicas < 90:
                batch = 8
            elif 90 <= replicas < 100:
                batch = 9
        else:
            if batch == 0:
                raise Exception("batch cannot be 0")
        # import math
        # gray_percentage = 1 # math.ceil(1/replicas * 100)  灰度比例
        # 更新image
        body["spec"]["template"]["spec"]["containers"][0]["image"] = self.image_name
        # 更新发布策略
        steps = list()
        steps.append({'setWeight': gray_percentage})
        steps.append({'pause': {}})
        for i in range(batch - 1):
            steps.append({'setWeight': int(100/ batch) * (i + 1)})
            if release_type == "auto":
                steps.append({'pause': {"duration": batch_wait_time}})
            else:
                steps.append({'pause': {}})
        # 最后一批发布进度100%
        steps.append({'setWeight': 100})
        steps.append({'pause': {"duration": batch_wait_time}})
        # 发布策略更新
        body["spec"]["strategy"]["canary"]["steps"] = steps
        # 发布更新
        resp = self.k8s_api.call_api(url,
                                     "PATCH",
                                     body=body,
                                     _preload_content=False,
                                     header_params={'Content-Type': 'application/merge-patch+json'})
        rollout_info = resp[0].data.decode("utf-8")
        rollout_info = json.loads(rollout_info)
        return rollout_info



Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐