实现功能:

    1.从FTP下载最新线刷包。
    2.检测adb连接是否正常。
    3.进行adb root
    4.push_file()把本地升级包EP40_IMAGES_ALL.zip推送到/data/update/temp/目录下。
    5.用updatecmd()升级/data/update/temp/目录下的EP40_IMAGES_ALL.zip升级包。

获取最新升级包到本地

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import urllib.request
import re
from ftplib import FTP
from datetime import datetime
import time

def current_time():
    return datetime.now().strftime('%Y-%m-%d_%H-%M-%S')

class FTPDownLoad:
    def __init__(self):  # 初始化
        self.host = "10.4.12.203"  # ftp主机ip
        self.username = "Anonymous"  # ftp用户名
        self.password = ""  # ftp密码
        self.port = 2112  # ftp端口 (默认21)
        self.ftp = FTP()

    def connect(self):  # 连接FTP server
        self.ftp.encoding = 'GBK'  # 设置编码,解决上传的文件包含中文的问题
        self.ftp.set_debuglevel(0)  # 调试模式
        self.ftp.connect(host=self.host, port=self.port)  # 连接ftp
        self.ftp.login(self.username, self.password)  # 登录ftp
        self.ftp.set_pasv(True)  # 打开被动模式
        # print(self.ftp.getwelcome())  # 显示登录ftp信息

    def quit(self):  # 断开连接FTP server
        self.ftp.quit()

    def download_file(self, ftp_file_path):
        self.ftp.cwd(ftp_file_path)
        self.entries = list(self.ftp.mlsd())
        print(self.entries)
        # Only interested in directories
        self.entries = [entry for entry in self.entries if entry[1]["type"] == "dir"]
        # Sort by timestamp
        self.entries.sort(key=lambda entry: entry[1]['modify'], reverse=True)
        # Pick the first one
        latest_name = self.entries[0][0]
        print(latest_name)

        dir_name = ftp_file_path + "/" + latest_name
        self.ftp.cwd(dir_name)
        self.entries = list(self.ftp.mlsd())
        print(self.entries)
        # Only interested in directories
        entries = [entry for entry in self.entries if entry[1]["type"] == "file" and re.match(r'.*_ALL.zip', entry[0])]
        print("匹配最新文件:", entries)
        # Pick the first one
        latest_file_name = entries[0][0]
        print("最新文件名:", latest_file_name)

        # EP40R项目路径
        local_EP40 = os.path.join(r'/home/devops/tools/autoupdatesystem/updatePackage', latest_file_name)

        # EP40项目路径
        print('self.host:',self.host)
        print('self.port:',self.port)
        self.url_EP40 = "ftp://10.4.12.203:2112" + dir_name + "/" + latest_file_name
        print(str(self.url_EP40))

        urllib.request.urlretrieve(str(self.url_EP40), local_EP40)


# 程序入口
if __name__ == '__main__':
    ftp_file_path = "/EP40/IHU_Release/AndroidR/UAT/"  # FTP目录 遍历此目录内的文件
    # ftp_file_key = ['update.zip', 'ReleaseNote.xls', 'update-mcu.*.zip']  # 以正则表达式形式表示的关键字列表,只下载此列表内的文件
    FTP_Download = FTPDownLoad()
    wait_time = 10
    try:
        print('start one loop package download process in %s' % current_time())
        FTP_Download.connect()
        FTP_Download.download_file(ftp_file_path)
        FTP_Download.quit()
        print('finish one loop package download process in %s' % current_time())
        time.sleep(wait_time)
    except Exception as e:
        print(current_time()+': '+str(e))
        time.sleep(wait_time)

 升级安卓车机版本

# -*- coding: utf-8 -*-
import os
import sys
import time
from datetime import datetime



class AutoUpdateSystem:
    '''
    实现功能:
    通过adb命令的方式将本地的update.zip升级包上传到挂载到qnx的目录下‘/qnx’,然后通过shell脚本将升级命令写入到‘/qnx/update/recovery/command’里;
    通过命令reboot recovery重启进入升级模式进行升级。
    '''

    def adb_root(self):
        self.adb_root_result = os.system('adb root')
        if self.adb_root_result != 0:
            print(current_time(), '----------adb root failed! ----------')
        print(current_time(), '----------adb root passed! ----------')
    def mount(self):
        self.adb_mount = os.system('adb shell /sbin/busybox mount -t nfs -o nolock 172.16.1.69:/ /qnx')
        time.sleep(3)
        if self.adb_mount != 0:
            print(current_time(), '----------adb mount failed! ----------')
        print(current_time(), '----------adb mount passed! ----------')

    def create_file(self):
        self.mkdir_file = os.system('adb shell mkdir -p /qnx/update/temp')
        if self.mkdir_file != 0:
            print(current_time(), '----------mkdir temp failed! ----------')
        print(current_time(), '----------mkdir temp passed! ----------')

    def chmod_file(self):
        self.change_power = os.system('adb shell chmod 777 /qnx/update/temp')
        if self.change_power != 0:
            print(current_time(), '----------chmod file failed! ----------')
        print(current_time(), '----------chmod file passed! ----------')

    def push_package_file(self, fname):
        self.push_update_package = os.system('adb push {0} /qnx/update/temp/update.zip'.format(os.path.join( sys.path[0], fname)))
        if self.push_update_package != 0:
            print(current_time(), '----------push package file failed! ----------')
        print(current_time(), '----------push package file passed! ----------')

    def push_script(self):
        self.rm_script = os.system('adb shell rm -rf /qnx/update/recovery/command')
        self.push_script = os.system('adb push {0} /qnx/update/recovery/'.format(os.path.join(sys.path[0], 'Script', 'updateCommand.sh')))
        print(os.path.join(sys.path[0], 'Script', 'updateCommand.sh'))
        if self.push_script != 0:
            print(current_time(), '----------push script failed! ----------')
        print(current_time(), '----------push script passed! ----------')

    def exec_script(self):
        self.exec_script = os.system('adb shell sh /qnx/update/recovery/updateCommand.sh')
        if self.exec_script != 0:
            print(current_time(), '----------exec script failed! ----------')
        print(current_time(), '----------exec script passed! ----------')

    def exec_sync(self):
        self.sync = os.system('adb shell sync')
        if self.sync != 0:
            print(current_time(), '----------exec sync failed! ----------')
        print(current_time(), '----------exec sync passed! ----------')

    def reboot_recovery(self):
        self.rboot_recovery = os.system('adb shell reboot recovery')
        print(current_time(), '----------reboot_recovery ----------')

def current_time():
    return datetime.now().strftime('%Y-%m-%d_%H-%M-%S')

def TestAdbConnect():
    while True:
        time.sleep(3)
        ConnectStatus = os.system('adb shell ls')
        if ConnectStatus == 0:
            print('adb可正常连接.')
            break
        else:
            print('adb连接失败,请到系统设置->版本->关闭/开启adb模式')

class E40UpdateSystem:
    '''
    实现功能:
    1.检测adb连接是否正常。
    2.进行adb root
    3.push_file()把本地升级包EP40_IMAGES_ALL.zip推送到/data/update/temp/目录下。
    4.用updatecmd()升级/data/update/temp/目录下的EP40_IMAGES_ALL.zip升级包。
    '''
    def create_file(self, file_path):
        self.mkdir_file = os.system('adb shell mkdir -p {0}'.format(file_path))
        if self.mkdir_file != 0:
            print(current_time(), '----------mkdir temp failed! ----------')
        print(current_time(), '----------mkdir temp passed! ----------')

    def chmod_file(self, file_path):
        self.change_power = os.system('adb shell chmod 777 {0}/*'.format(file_path))
        if self.change_power != 0:
            print(current_time(), '----------chmod file failed! ----------')
        print(current_time(), '----------chmod file passed! ----------')

    def push_file(self, fname, file_path):
        self.rm_update_package = os.system('adb shell rm -rf {0}*'.format(file_path))
        self.push_update_package = os.system('adb push {0} {1}'.format(os.path.join(sys.path[0], fname), file_path))
        if self.push_update_package != 0:
            print(current_time(), '----------push package file failed! ----------')
        print(current_time(), '----------push package file passed! ----------')

    def updatecmd(self, file_path):
        self.update_cmd = os.system('adb shell am broadcast -a com.hezon.update.action --es "path" "{0}/EP40_IMAGES_ALL.zip"'.format(file_path))
        if self.update_cmd != 0:
            print(current_time(), '----------update package file failed! ----------')
        print(current_time(), '----------update package file passed! ----------')


if __name__ == '__main__':
    file_path = '/data/update/temp/'
    print(current_time(), '----------start update system ----------')
    TestAdbConnect()                                                    # 检测adb是否正常连接。
    time.sleep(2)
    EP31cmd = AutoUpdateSystem()
    EP31cmd.adb_root()                                                  # 进行adb root
    time.sleep(2)
    EP40cmd = E40UpdateSystem()
    EP40cmd.create_file(file_path)                                      # 给升级包增加可读可写权限
    time.sleep(2)
    fname = os.path.join('updatePackage', 'EP40_IMAGES_ALL.zip')
    print(fname)
    if sys.argv[1] is not None and os.path.exists(sys.argv[1]):
        fname = sys.argv[1]
    EP40cmd.push_file(sys.argv[1], file_path)                                       # 把updatePackage目录下的升级包推送到‘/mnt/user/0/emulated/’目录里
    time.sleep(2)
    EP40cmd.chmod_file(file_path)
    time.sleep(2)
    EP40cmd.updatecmd(file_path)                                                  # 执行升级指令,进行升级。
    time.sleep(600)
    print(current_time(), '----------End update system ----------')


Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐