通过k8s api 审批 kubelet 发起的 CSR 请求,用于二制部署k8s集群自动添加节点的操作
通过k8s api 审批 kubelet 发起的 CSR 请求kubectl certificate approve node-csr-xxxxxxxxxxxxxxxxxxxxximport requests, jsonclass K8csr:def init(self, url, token):self._url = urlself._token = tokenself.headers = {“
·
通过k8s api 审批 kubelet 发起的 CSR 请求
kubectl certificate approve node-csr-xxxxxxxxxxxxxxxxxxxxx
import requests, json
class K8sapi:
def __init__(self, url, token):
self._url = url
self._token = token
self.headers = {"Authorization": "Bearer " + self._token,
"Accept": "application/json, */*",
"Content-Type": "application/json"
}
def get_result(self):
api = '/apis/certificates.k8s.io/v1beta1/certificatesigningrequests?limit=20'
try:
response = requests.get(self._url + api, headers=self.headers, verify=False, timeout=10)
responselist = response.json().get('items')
return responselist
except Exception as err:
print(err)
def put_result(self, nodescr):
csrurl = '/apis/certificates.k8s.io/v1beta1/certificatesigningrequests/' + nodescr
data = {
"kind": "CertificateSigningRequest",
"apiVersion": "certificates.k8s.io/v1beta1",
"metadata": {
"name": nodescr
},
"status": {"conditions": [{"type": "Approved", "reason": "pythonapiApprove"}]}
}
try:
json_data = requests.put(self._url + csrurl + '/approval', headers=self.headers, data=json.dumps(data),
verify=False, timeout=10)
print(json_data.json())
except Exception as err:
print(err)
def run(self):
resultlist = self.get_result()
try:
if resultlist:
for items in resultlist:
name = items.get('metadata').get('name').encode('raw_unicode_escape')
print("Approve nodecsr:{}".format(name))
self.put_result(name)
except Exception as err:
print("add node csr err", err)
if __name__ == '__main__':
api='https://127.0.0.1:6443'
token='xxxxxxxx'
k8s=K8sapi(api,token)
k8s.run()
更多推荐
已为社区贡献4条内容
所有评论(0)