ser2net 串口转网口

修改配置文件

    vim   /etc/ser2net.conf
    # telnet 
    #  <TCP port>:<state>:<timeout>:<device>:<options>
    20053:telnet:14400:/dev/ttyUSB5:115200 8DATABITS NONE 1STOPBIT LOCAL banner

重启 ser2net 服务

sudo service ser2net restart

 

运行

python runner.py 123.yaml

cli.log

test.log

 

 runner.py

核心: python  pexpect.spawn 的使用

参考:https://www.cnblogs.com/mmdln/p/9006274.html

           http://codingdict.com/sources/py/pexpect/12976.html

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pexpect
import os
try:
    import urlparse
except Exception:
    import urllib.parse as urlparse
import logging
import argparse
try:
    import ConfigParser
except Exception:
    import configparser as ConfigParser
import yaml
import sys
import time

from configobj import ConfigObj, ConfigObjError


class Logger(object):
    """
    my logger tools
    """
    default_formatter = logging.Formatter(
        fmt='%(asctime)s %(levelname)-8s: %(name)s: %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )

    def __init__(self):
        pass

    @staticmethod
    def add_stream_handler(stream=sys.stdout, level=logging.INFO, formatter=default_formatter):
        logger = logging.getLogger("")
        handler = logging.StreamHandler(stream)
        handler.setFormatter(formatter)
        handler.setLevel(level)
        logger.addHandler(handler)

    @staticmethod
    def add_file_handler(logfile=None, level=logging.DEBUG, formatter=default_formatter):
        if logfile is not None:
            logger = logging.getLogger("")
            handler = logging.FileHandler(logfile, "w")
            handler.setFormatter(formatter)
            handler.setLevel(level)
            logger.addHandler(handler)

    @staticmethod
    def log_test(stream=sys.stdout, stream_level=logging.INFO, logfile=None, logfile_level=logging.DEBUG, formatter=default_formatter):
        Logger.add_file_handler(logfile, logfile_level, formatter)
        Logger.add_stream_handler(stream, stream_level, formatter)
        logger = logging.getLogger("")
        logger.setLevel(logging.DEBUG)


Logger.log_test(logfile="test.log")
cli_servers = ["telnet", "ssh", "raw", "tio"]
log = logging.getLogger("runner")


class log_pexpect(pexpect.spawn):
    def __init__(self, *args, **kwargs):
        super(log_pexpect, self).__init__(*args, **kwargs)
        self.log_test = False

    def expect(self, *args, **kwargs):
        _e = super(log_pexpect, self).expect(*args, **kwargs)
        if self.log_test:
            log.info(self.before)
        return _e

    # I did not find a way to make the telnet client not echo back our
    # commands. This workaround consumes the echo and allows this pattern:
    #         sendline("print 42")
    #         expect("42")  # match command output and not the command itself
    #         expect(prompt)
    # Without this patch,  expect("42") will match the echo first (print 42),
    # and leave the real output untouched
    def sendline(self, s='', expect_echo=True):
        n = super(log_pexpect, self).sendline(s)
        if expect_echo:
            if s:
                self.expect_exact(s)
            self.expect_exact(os.linesep)
        return n

    def set_logfile(self, logfile):
        # logfile_read contains the whole session but has the advantage over
        # logfile that it is not obfuscated by duplicated lines
        if logfile:
            self.logfile_read = open(logfile, "w")
            self.livelog = open(logfile, "r")


def validate_cli(url):
    assert url.scheme in cli_servers, "bad console type \"%s\"" % url.scheme
    assert url.netloc != "", "console server is missing"


def parse_cli_url(string):
    """Check that the argument is a supported console URL"""
    url = urlparse.urlparse(string)
    try:
        validate_cli(url)
    except AssertionError:
        log.error("%s is not a valid console URL" % string)
        raise
    return url


class TelnetCLI(log_pexpect):
    def __init__(self, cli_url, logfile=None, do_spawn=True):
        self.cli_url = parse_cli_url(cli_url)
        if not do_spawn:
            self.closed = True
            return
        cmd = " ".join(self.cmd())
        super(TelnetCLI, self).__init__(cmd)
        self.set_logfile(logfile)

        self.uboot_prompt = "=> "
        self.linux_prompt = "[rO]\S+ *[$#] |[rO]\S+ .*\][$#] "
        self.prompt = self.uboot_prompt
        # Match the last line of the telnet banner
        self.expect("Escape character is ")
        # Wait a little to make sure the connection is really open. Far away or
        # slow servers take about 1.15 seconds to reply.
        index = self.expect(["Connection closed",
                             pexpect.EOF,
                             pexpect.TIMEOUT], timeout=1.3)
        if index == 0 or self.eof():
            log.error("connection failed")
            raise pexpect.EOF(self.before + self.after)

    def cmd(self):
        host, port = self.cli_url.netloc.split(":")
        return ["telnet", host, port]

    def set_uboot_prompt(self, prompt=None):
        if not prompt:
            self.prompt = self.uboot_prompt
        else:
            self.prompt = prompt

    def set_linux_prompt(self, prompt=None):
        if not prompt:
            self.prompt = self.linux_prompt
        else:
            self.prompt = prompt

    def close(self):
        # close() in pexpect.py has one bug, it can not close telnet properly
        # in some situations, see https://bugzilla.redhat.com/show_bug.cgi?id=89653,
        # so override close() here
        if not self.closed:
            self.sendcontrol("]")
            self.expect(">", 3)
            self.sendline("quit", expect_echo=False)
            super(TelnetCLI, self).close()


class PromptNotFound(Exception):
    """An error from trying to convert a command line string to a type."""
    pass


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("config", nargs='+', help="configuration for the test")

    args = parser.parse_args()
    return args


def send_cmd(cli, cmd='', timeout=30, check_prompt=False, check_ret=False,
             expect_echo=True):
    """ Uboot command wrapper that allows multiple types of checks.

    :param cli: command line interface object
    :param cmd: command string
    :param timeout: expect timeout while waiting for prompt
    :param check_prompt: verify prompt after command execution
    :type check_prompt: boolean
    :param check_ret: verify command return code (raise error if not 0)
    :type check_ret: boolean
    :param expect_echo: expect the command echo before continuing, in order
        to avoid timeouts when incomplete echo is displayed
    :type expect_echo: boolean
    :return: command output as string (when using check_prompt or check_ret)
    """
    if not timeout:
        timeout = 30
    log.info("Sending: %s, timeout=%s, prompt=%s, ret=%s, echo=%s" % (cmd, timeout, check_prompt, check_ret, expect_echo))
    cli.sendline(cmd, expect_echo=expect_echo)
    cmd_output = None

    # These codes are workaround for unstability of LS2 simulator.
    # Comment out, or else lars needs to wait for each command 2 seconds.
    """
    # check whether the cmd has been fully sent out,
    # if not, will retry 3 times
    re_send = 3
    while re_send > 0:
        re_send -= 1
        if not valid_uboot_cmd(cli):
            log.info("Resend cmd %s" % cmd)
            cli.sendline(cmd, expect_echo=expect_echo)
        else:
            break
    """

    if check_prompt or check_ret:
        try:
            cli.expect(cli.prompt, timeout)
        except pexpect.TIMEOUT:
            log.error("Prompt not found after \"%s\"! Command output:\n%s" %
                      (cmd, cli.before))
            raise PromptNotFound("Prompt not found after \"%s\"!" % cmd)
        cmd_output = cli.before
    if check_ret:
        log.debug("Return code check")
        cli.sendline("echo $?") #check ret code
        try:
            cli.expect('^0', timeout=2)
        except pexpect.TIMEOUT:
            log.error("Command retcode non-zero for \"%s\"! Command output:\n%s" %
                      (cmd, cmd_output))
            raise Exception("Command retcode non-zero for \"%s\"!" % cmd)
        cli.expect(cli.prompt, timeout=2)
    return cmd_output


def send_uboot_cmd(cli, cmd='', timeout=30, check_prompt=False, check_ret=False,
                   expect_echo=True):
    """ Uboot command wrapper that allows multiple types of checks.

    :param cli: command line interface object
    :param cmd: command string
    :param timeout: expect timeout while waiting for prompt
    :param check_prompt: verify prompt after command execution
    :type check_prompt: boolean
    :param check_ret: verify command return code (raise error if not 0)
    :type check_ret: boolean
    :param expect_echo: expect the command echo before continuing, in order
        to avoid timeouts when incomplete echo is displayed
    :type expect_echo: boolean
    :return: command output as string (when using check_prompt or check_ret)
    """
    if not timeout:
        timeout = 30
    log.info("Sending: %s, timeout=%s, prompt=%s, ret=%s, echo=%s" % (cmd, timeout, check_prompt, check_ret, expect_echo))
    cli.sendline(cmd, expect_echo=expect_echo)
    cmd_output = None

    if check_prompt or check_ret:
        try:
            n = cli.expect([cli.prompt, "Hit any key to stop autoboot:"], timeout)
            if n == 1:
                cli.sendline("\n")
        except pexpect.TIMEOUT:
            log.error("Prompt not found after \"%s\"! Command output:\n%s" %
                      (cmd, cli.before))
            raise PromptNotFound("Prompt not found after \"%s\"!" % cmd)
        cmd_output = cli.before

    else:
        n = cli.expect([cli.prompt, "Hit any key to stop autoboot:"], timeout=300)
        if n == 1:
            cli.sendline("\n")
    if check_ret:
        log.debug("Return code check")
        cli.sendline("echo $?") #check ret code
        try:
            cli.expect('^0', timeout=2)
        except pexpect.TIMEOUT:
            log.error("Command retcode non-zero for \"%s\"! Command output:\n%s" %
                      (cmd, cmd_output))
            raise Exception("Command retcode non-zero for \"%s\"!" % cmd)
        cli.expect(cli.prompt, timeout=2)
    return cmd_output


def config_shell(cli, columns=2000):
    """Configures shell with default values - removes dependency on rootfs.
    Use this before first operation in linux console to ensure a fixed prompt
    and a convenient raw length for long commands

    :param cli: command line interface object
    :param columns: longest command line should be lower than this to avoid
        command echo issues
    """
    log.info("Configuring shell")

    try:
        send_cmd(cli, "which stty", check_ret=True)
    except Exception:
        log.warning("NO stty! Please enable CONFIG_BUSYBOX_CONFIG_STTY!")
    else:
        send_cmd(cli, "stty columns 2000", check_ret=True)

    send_cmd(cli, "export TERM=linux", check_ret=True)

    send_cmd(cli, "export EXTRACT_UNSAFE_SYMLINKS=1", check_ret=True)
    send_cmd(cli, "alias ls='ls --color=never'", check_ret=True)
    send_cmd(cli, "alias grep='grep --color=never'", check_ret=True)
    send_cmd(cli, "mkdir -p /media/ram", check_ret=True)
    send_cmd(cli, "mount -t tmpfs tmpfs /media/ram/", check_ret=True, timeout=2)


def check_boot(cli, timeout=300):
    """Checks for successful login. Verifies the kernel bootlog and logs
    warnings and errors. By default, when a fatal error is encountered
    it raises an exception

    :param cli: command line interface object
    :param cfg: board object (or any dict containing the necessary info)
    :param cont_on_error: if ``True``, continue if an error keyword is found
    :param dump_kernel_cfg: if ``True``, execute zcat /proc/config.gz cmd
    :param timeout: timeout of linux startup
    """
    fatal_errors = [
        "(?i)kernel panic",
        "(?i)Internal error: Oops:",
        "(?i)rebooting in * seconds",
        "ramdisk - allocation error"
    ]
    warnings = [
        "(?i)failed",
        "(?i)warning",
        "(?i)call trace[\s\S]*end trace"
    ]
    password = "root"
    match_list = ["login:", cli.prompt, "press Control-D to continue"] + fatal_errors + warnings
    cli.set_linux_prompt()
    log.debug("linux_prompt as %s" % cli.prompt)

    while True:
        found = cli.expect(match_list, timeout=timeout)
        if found == 0:
            # do login
            cli.sendline("root")
            found = cli.expect(["Password:", cli.prompt], timeout=30)
            if found == 0:
                cli.sendline(password, expect_echo=False)
                cli.sendline(" ")
                cli.expect(cli.prompt)
            cli.sendline(" ")
            config_shell(cli)
            break


def parse_command(cmd):
    cmds = {
        'cmd': '',
        'timeout': None,
        'check_prompt': 'False',
        'check_ret': 'False',
        'expect_echo': 'True'
    }
    if '@@' not in cmd:
        cmds['cmd'] = cmd
    else:
        plist = cmd.split('@@')
        cmds['cmd'] = plist[0]
        for i in plist[1:]:
            if '==' in i:
                k, v = i.split('==')
                if k in cmds:
                    cmds[k] = v
                else:
                    log.warning('%s is ignored in command %s' % (i, cmd))
            else:
                log.warning('%s is ignored in command %s' % (i, cmd))
    if cmds['timeout']:
        cmds['timeout'] = int(cmds['timeout'])
    if cmds['check_prompt'].lower() == "false":
        cmds['check_prompt'] = False
    else:
        cmds['check_prompt'] = True

    if cmds['check_ret'].lower() == "false":
        cmds['check_ret'] = False
    else:
        cmds['check_ret'] = True

    if cmds['expect_echo'].lower() == "false":
        cmds['expect_echo'] = False
    else:
        cmds['expect_echo'] = True

    return cmds


def run_uboot_script(cli, commands):
    cli.set_uboot_prompt()
    for cmd in commands:
        commands = parse_command(cmd)
        if commands["cmd"] == "boot":
            cli.sendline("boot")
            check_boot(cli)
        else:
            send_uboot_cmd(cli, **commands)


def run_linux_script(cli, commands):
    cli.set_linux_prompt()
    for cmd in commands:
        commands = parse_command(cmd)
        send_uboot_cmd(cli, **commands)


def get_cli(url):
    cli = TelnetCLI(url, logfile="cli.log")
    return cli

def run_config_file(yaml_file):

    with open(yaml_file, 'r') as fp:
        data_dic = yaml.safe_load(fp)
        log.info("load yaml config successfully, [%s]." % yaml_file)

        log.debug("yaml config file: %s" % yaml_file)
        url = data_dic.get("url")
        log.info("board connect url: %s" % url)
        #run_scripts(data_dic)
        try:
            cli = get_cli(url)
            uboot_cmds = data_dic.get("uboot_commands", None)
            kernel_commands = data_dic.get("kernel_commands", None)
            if uboot_cmds:
                run_uboot_script(cli, uboot_cmds)
            if kernel_commands:
                run_linux_script(cli, kernel_commands)

        finally:
            if cli:
                cli.close()
def main():
    args = parse_args()
    log.info(args)
    print(args.config)
    for config in args.config:
        run_config_file(config)


if __name__ == "__main__":
    main()

 

123.yaml

kernel_commands:
  - uname -a
  - cat /proc/cmdline
  - lsmod
  - fdisk -l

url: "telnet://127.0.0.1:20053"

 

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐