有个业务是基于 Apache Karaf 开发的,运行在 K8S 里面,跑了一堆的 bundle,偶尔里面挂一两个 bundle 容器是没什么感知的,但是业务可能会局部异常。要推动业务优化的话,周期就会拉得很长,因此先弄个定时拨测来监控下 bundle 是不是都在正常运行的。
看了下要查看 Karaf 里面的 bundle 需要 ssh 进去执行 bundle list 命令,因此基于之前的经验利用 Python paramiko 快速实现了诊断逻辑,并接入我们的拨测平台定时运行。结果在实际拨测的时候经常出现命令执行后 recv 拉取结果超时的问题。

Tips:不想阅读啰嗦繁琐的定位过程,可以直接跳到文末看最后的代码和结论。
其中 SSH 执行命令的关键代码大概长这个样子:
| # -*- coding: utf-8 -*- |
| from paramiko import SSHClient, AutoAddPolicy |
| from func_timeout import func_set_timeout, exceptions |
| |
| |
| class RemoteCMD(): |
| """执行远程命令 |
| """ |
| def _init_connection(self, |
| ip: str, |
| port: int, |
| user: str, |
| passwd: str, |
| timeout: int = 5): |
| """初始化连接(这里单独抽出可以避免 func 超时未关闭 session) |
| |
| Args: |
| ip (str): IP 地址 |
| port (int): SSH 端口 |
| user (str): 用户名 |
| passwd (str): 密码 |
| timeout (int, optional): 超时时长(秒). Defaults to 5. |
| |
| Returns: |
| paramiko object: paramiko 客户端对象 |
| """ |
| client = paramiko.SSHClient() |
| client.load_system_host_keys() |
| client.set_missing_host_key_policy(AutoAddPolicy()) |
| client.connect(hostname=ip, |
| port=int(port), |
| username=user, |
| password=passwd, |
| timeout=timeout, |
| allow_agent=False, |
| look_for_keys=False) |
| |
| client = client.invoke_shell() |
| return client |
| |
| # 为了防止超时,加了个 func_timeout 装饰器来限制执行时长 |
| @func_set_timeout(5) |
| def _send_cmd(self, client: object, cmd: str, recv_size: int = 512): |
| """发送命令、拉取结果 |
| |
| Args: |
| client (object): paramiko 客户端对象 |
| cmd (str): 要执行的命令 |
| recv_size (int, optional): 单次拉取结果数据的大小. Defaults to 512. |
| |
| Returns: |
| string: 执行结果 |
| """ |
| |
| client.send(f"{str(cmd).strip()}\n") |
| ret_info = "" |
| while True: |
| data = client.recv(256).decode() |
| if "GMT" in data: # GMT 是这个 case 下命令的结束标识 |
| break |
| ret_info += data |
| |
| return ret_info |
| |
| def command(self, |
| ip: str, |
| port: int, |
| user: str, |
| passwd: str, |
| cmd: str, |
| timeout: int = 5): |
| """执行远程命令 |
| |
| Args: |
| ip (str): IP 地址 |
| port (int): SSH 端口 |
| user (port): 用户名 |
| passwd (str): 密码 |
| cmd (str): 要执行的命令 |
| timeout (int, optional): 超时时长(秒). Defaults to 5. |
| |
| Returns: |
| tuple: (True/False, 执行结果或报错) |
| """ |
| client = None |
| try: |
| client = self._init_connection(ip, port, user, passwd, timeout) |
| |
| except Exception as err: # pylint: disable=broad-except |
| return False, f"SSH 连接出现异常:{err}" |
| |
| try: |
| return True, self._send_cmd(client, cmd) |
| |
| except exceptions.FunctionTimedOut: |
| return False, "SSH 执行出现超时错误!" |
| |
| except Exception as err: # pylint: disable=broad-except |
| return False, f"SSH 执行出现其他异常:{err}" |
| |
| finally: |
| if client: |
| client.close() |
非常简单的逻辑,结果跑起来每天会有几次拨测返回"SSH 执行出现超时错误",通过定位发现超时问题主要是在 client.recv 拉取执行结果这里,因此搜索了下“paramiko invoke_shell recv 超时”看了一些前人的经验,很多都说是 stderr buffer 等方面的问题,需要用 recv_stderr 来拉取,结果尝试了下也不太靠谱。
继续找,终于找到一个比较接近的 case:有关 paramiko 使用 send 和 recv 的心得和出现的疑问 _cccccccc123 的博客-CSDN 博客 _paramiko recv,他最终的解决办法是在发命令之后先执行一次 recv,然后在正式执行 recv 之前休眠 1 秒,我也试了下,还真稳定了很多,诶嘿?
感觉不是太优雅,所以继续看了下paramiko 官方手册,在手册找到了一个叫做 recv_ready 的函数的介绍如下:
recv_ready
()
Returns true if data is buffered and ready to be read from this channel. A False
result does not mean that the channel has closed; it means you may need to wait before more data arrives.
Returns:
True
if a recv
call on this channel would immediately return at least one byte; False
otherwise.
简单解释就是当通道数据已缓冲完毕、读取状态已就绪时返回 True,诶嘿,看起来很像是这个。
快速将代码如下改动试了下:
| # -*- coding: utf-8 -*- |
| from paramiko import SSHClient, AutoAddPolicy |
| from func_timeout import func_set_timeout, exceptions |
| |
| |
| class RemoteCMD(): |
| """执行远程命令 |
| """ |
| def _init_connection(self, |
| ip: str, |
| port: int, |
| user: str, |
| passwd: str, |
| timeout: int = 5): |
| """初始化连接(这里单独抽出可以避免 func 超时未关闭 session) |
| |
| Args: |
| ip (str): IP 地址 |
| port (int): SSH 端口 |
| user (str): 用户名 |
| passwd (str): 密码 |
| timeout (int, optional): 超时时长(秒). Defaults to 5. |
| |
| Returns: |
| paramiko object: paramiko 客户端对象 |
| """ |
| |
| client = paramiko.SSHClient() |
| client.load_system_host_keys() |
| client.set_missing_host_key_policy(AutoAddPolicy()) |
| client.connect(hostname=ip, |
| port=int(port), |
| username=user, |
| password=passwd, |
| timeout=timeout, |
| allow_agent=False, |
| look_for_keys=False) |
| |
| client = client.invoke_shell() |
| return client |
| |
| # 为了防止超时,加了个 func_timeout 装饰器来限制执行时长 |
| @func_set_timeout(5) |
| def _send_cmd(self, client: object, cmd: str, recv_size: int = 512): |
| """发送命令、拉取结果 |
| |
| Args: |
| client (object): paramiko 客户端对象 |
| cmd (str): 要执行的命令 |
| recv_size (int, optional): 单次拉取结果数据的大小. Defaults to 512. |
| |
| Returns: |
| string: 执行结果 |
| """ |
| |
| client.send(f"{str(cmd).strip()}\n") |
| # 等待就绪 |
| while not client.recv_ready(): |
| time.sleep(0.02) |
| ret_info = "" |
| while True: |
| data = client.recv(256).decode() |
| if "GMT" in data: # GMT 是这个 case 的结束标识 |
| break |
| ret_info += data |
| |
| return ret_info |
| |
| def command(self, |
| ip: str, |
| port: int, |
| user: str, |
| passwd: str, |
| cmd: str, |
| timeout: int = 5): |
| """执行远程命令 |
| |
| Args: |
| ip (str): IP 地址 |
| port (int): SSH 端口 |
| user (port): 用户名 |
| passwd (str): 密码 |
| cmd (str): 要执行的命令 |
| timeout (int, optional): 超时时长(秒). Defaults to 5. |
| |
| Returns: |
| tuple: (True/False, 执行结果或报错) |
| """ |
| client = None |
| try: |
| client = self._init_connection(ip, port, user, passwd, timeout) |
| |
| except Exception as err: # pylint: disable=broad-except |
| return False, f"SSH 连接出现异常:{err}" |
| |
| try: |
| return True, self._send_cmd(client, cmd) |
| |
| except exceptions.FunctionTimedOut: |
| return False, "SSH 执行出现超时错误!" |
| |
| except Exception as err: # pylint: disable=broad-except |
| return False, f"SSH 执行出现其他异常:{err}" |
| |
| finally: |
| if client: |
| client.close() |
这个代码跑了上 1000 遍,发现偶尔还有零星超时的。通过定位,发现其实是卡在最后一次 recv,如果数据拉完了但是继续执行 recv 的话就会夯在那里不动了,因此需要有一个准确 break 循环的逻辑。而上述代码其实是加了"GMT"作为结束标识的,那这里的问题可能是因为 GMT 恰好卡在 2 次 recv 被截断了,导致没能正确退出循环。
因此,必须要设定一个非常准确的单字符或者正则表达式作为结束标识,且要用总的拼接结果进行匹配,因为用单次拉取的结果匹配可能会存在数据截断导致匹配不上卡住的问题(如上面第二段代码)。
最终代码改动如下:
| import re |
| import time |
| from paramiko import SSHClient, AutoAddPolicy |
| from func_timeout import func_set_timeout, exceptions |
| |
| |
| class RemoteCMD(): |
| """执行远程命令 |
| """ |
| def _init_connection(self, |
| ip: str, |
| port: int, |
| user: str, |
| passwd: str, |
| timeout: int = 5): |
| """初始化连接(这里单独抽出可以避免 func 超时未关闭 session) |
| |
| Args: |
| ip (str): IP 地址 |
| port (int): SSH 端口 |
| user (str): 用户名 |
| passwd (str): 密码 |
| timeout (int, optional): 超时时长(秒). Defaults to 5. |
| |
| Returns: |
| paramiko object: paramiko 客户端对象 |
| """ |
| client = SSHClient() |
| # client.load_system_host_keys() |
| client.set_missing_host_key_policy(AutoAddPolicy()) |
| client.connect(hostname=ip, |
| port=int(port), |
| username=user, |
| password=passwd, |
| timeout=timeout, |
| allow_agent=False, |
| look_for_keys=False) |
| |
| client = client.invoke_shell() |
| return client |
| |
| def _match_prompt(self, content: str, regex: str): |
| """结束提示符匹配 |
| |
| Args: |
| content (str): 需要匹配的字符串 |
| regex (str): 需要匹配的关键词或正则表达式 |
| |
| Returns: |
| bool: True/False |
| """ |
| if re.search(regex, content): |
| return True |
| |
| return False |
| |
| @func_set_timeout(5) |
| def _send_cmd(self, |
| client: object, |
| cmd: str, |
| recv_end_prompt: str, |
| recv_size: int = 512): |
| """发送命令、拉取结果 |
| |
| Args: |
| client (object): paramiko 客户端对象 |
| cmd (str): 要执行的命令 |
| recv_size (int, optional): 单次拉取结果数据的大小. Defaults to 512. |
| |
| Returns: |
| string: 执行结果 |
| """ |
| client.send(f"{str(cmd).strip()}\n") |
| # 等待就绪 |
| while not client.recv_ready(): |
| time.sleep(0.2) |
| |
| result = "" |
| while not self._match_prompt(result, recv_end_prompt): |
| result += client.recv(recv_size).decode() |
| |
| return result |
| |
| def command(self, |
| ip: str, |
| port: int, |
| user: str, |
| passwd: str, |
| cmd: str, |
| end_prompt: str=r"#\s$", |
| timeout: int = 5): |
| """执行远程命令 |
| |
| Args: |
| ip (str): IP 地址 |
| port (int): SSH 端口 |
| user (port): 用户名 |
| passwd (str): 密码 |
| cmd (str): 要执行的命令 |
| timeout (int, optional): 超时时长(秒). Defaults to 5. |
| |
| Returns: |
| tuple: (True/False, 执行结果或报错) |
| """ |
| client = None |
| try: |
| client = self._init_connection(ip, port, user, passwd, timeout) |
| |
| except Exception as err: # pylint: disable=broad-except |
| return False, f"SSH 连接出现异常:{err}" |
| |
| try: |
| return True, self._send_cmd(client, cmd, end_prompt) |
| |
| except exceptions.FunctionTimedOut: |
| return False, "SSH 执行出现超时错误!" |
| |
| except Exception as err: # pylint: disable=broad-except |
| return False, f"SSH 执行出现其他异常:{err}" |
| |
| finally: |
| if client: |
| client.close() |
最后总结一下:
卡住的根本原因:recv 已经拉取完毕之后,再去执行 recv 就会一直卡着等待通道返回数据,直到超时。
所以,解决这个问题的关键点是需要有一个固定、准确的结束标识来跳出 recv 循环,参考以下 2 个 Tips:
1. 在执行命令之后利用 recv_read()方法等待命令执行就绪才开始执行 recv 拉取数据,否则可能拉到不完整的输出;
2. 判断 recv 结束方法:recv 拉取执行结果时,必须使用能准确判定结束的单字符或正则表达式作为标识。
文章来源: 张戈博客 文章来源:
所有评论(0)