基于PX4的bootloader和uploader介绍(典藏)
PX4开源飞控系统由飞控软件和QGroundControl地面站组成,支持多种无人机类型。其核心组件包括Bootloader和Uploader:Bootloader是设备端底层软件,负责硬件初始化、固件验证和更新;Uploader是外部工具,用于传输固件并协调升级流程。二者通过通信协议协同工作,共同实现PX4系统的启动和固件更新功能。开发中需重点关注Bootloader的bl.c等源码和Uploa
本文目录
基于PX4的bootloader和uploader介绍
PX4
PX4 是一个开源的飞控系统(Flight Control System,FCS),广泛应用于无人机(UAV)、无人驾驶飞机(UAVs)及其他自动化飞行平台的控制。它提供了一套完整的硬件、软件和开发工具,支持多种类型的飞行器,包括多旋翼、固定翼、直升机、VTOL(垂直起降)等。
PX4系统主要由两部分组成:
- PX4飞控软件:这是 PX4 的核心部分,包含飞行控制算法、导航、传感器处理等功能。它运行在硬件控制板上,能够接收传感器数据(如IMU、GPS等),并控制飞行器的姿态和轨迹。
- QGroundControl:这是一个地面站软件,用于控制和监控 PX4 飞行器。它支持飞行规划、任务管理、飞行日志查看等功能,能够通过无线链路与飞行器进行通信。
PX4 具有高度的模块化和可扩展性,允许开发者根据特定需求定制飞控系统,支持多种传感器、通信协议以及飞行模式。此外,PX4 的开源性质使得全球的开发者和研究人员能够参与其开发和改进。
bootloader和uploader的功能
Bootloader(引导程序)和Uploader(上传工具)是嵌入式系统开发与维护中的两个核心概念,它们协同工作,共同实现了设备的启动和固件更新功能。下面将从定义、功能、工作流程等角度对二者进行详细阐述。
Bootloader(引导程序)
1. 定义与核心角色
Bootloader是存储在微控制器非易失性存储器(如Flash)中的一段底层软件,它是设备上电或复位后首先执行的代码。其核心角色是作为硬件与主应用程序之间的桥梁,负责初始化最基本的硬件,并决定下一步是加载并运行主应用程序,还是进入固件升级模式。
2. 详细功能
Bootloader的功能可以归纳为以下几个关键部分:
-
硬件初始化:在应用程序运行前,Bootloader需完成最基础的硬件环境设置。这通常包括:
- 配置系统时钟和电源管理。
- 初始化必要的外设控制器,如用于调试或通信的串口(USART/SCI)。
- 初始化内存控制器,为后续代码加载到RAM执行做好准备。
-
启动模式判断与应用程序加载:Bootloader会根据特定条件(如检测某个GPIO引脚电平、接收外部命令、或检查Flash特定区域标志位)来决定系统行为。
- 正常启动:如果条件满足(例如,无升级请求),Bootloader会从存储介质(如Flash的应用程序分区)读取主应用程序,将其校验并加载到执行地址(可能是直接跳转,也可能是搬运至RAM),随后将CPU控制权移交。
- 升级模式:如果检测到升级请求(例如,Flash下载分区有有效数据),则可能停留在Bootloader中,等待接收新的固件数据。
-
固件更新与管理:这是Bootloader支持OTA(空中下载)或本地升级的核心能力。
- 固件验证:对新接收或已存储的固件进行完整性校验,常用方法包括CRC校验、哈希验证(如Hash验证)以及检查约定的Magic值。
- 固件处理:对加密或压缩的固件进行解密、解压操作。
- 固件搬运与烧写:将经过验证和处理的固件数据,安全地写入到Flash的应用程序执行区域。此过程通常需要遵循芯片特定的Flash编程流程(如页擦除、写入)。
- 回滚与安全:高级的Bootloader可能支持双备份系统(A/B分区),在一份固件启动失败时自动回滚至另一份,并能在更新前后对固件进行身份认证。
-
通信协议支持:为了与外界(如PC上位机工具、升级服务器)交互,Bootloader需要实现一种或多种通信协议,用于接收命令和固件数据。常见协议包括:
- Xmodem协议:一种经典的、用于异步串行通信的文件传输协议,通过数据块、校验和及应答机制确保传输可靠性,在早期AVR等MCU的Bootloader中广泛应用。
- 自定义串口协议:许多Bootloader使用更简单的自定义命令帧进行交互。
- 网络协议:在支持网络的SoC上(如R-Car Gen3),Bootloader(或由其加载的次级引导程序如U-Boot)可以利用TFTP、HTTP等协议从网络获取固件,实现更高效的网络刷写。
3. 工作流程示例
以搜索结果中一个具备OTA能力的嵌入式设备为例,其Bootloader的工作流程可能如下:
- 上电启动:CPU从Bootloader固定地址开始执行。
- 基础初始化:初始化时钟、必要外设。
- 检查升级标志:读取Flash中的
download分区,检查是否存在有效的升级固件(例如,通过判断文件头部的特定Magic值)。 - 决策分支:
- 若无需升级:直接跳转到主应用程序(
app)分区执行。 - 若需升级:
- 对
download分区的数据依次进行CRC校验、解密、解压。 - 将处理后的数据写入
app分区。 - 对新写入的
app分区数据进行Hash校验以确保完整性。 - 校验成功后,擦除
download分区数据,然后重启设备。
- 对
- 若无需升级:直接跳转到主应用程序(
- 重启后:Bootloader再次运行,此时
download分区已空,遂跳转到已更新完成的app分区,启动新固件。
Uploader(上传工具/下载器)
1. 定义与核心角色
Uploader通常指运行在外部设备(如个人电脑、服务器或手机)上的一个软件工具或程序。它的核心角色是作为固件数据的发送方和升级流程的发起者,负责将编译好的新固件文件,通过特定的通信链路(如串口、USB、网络)传输到目标设备的Bootloader或正在运行的应用中。
2. 详细功能
Uploader的功能围绕“传输”和“协调”展开:
-
提供用户交互界面:Uploader可以是命令行工具(如
px_uploader.py),也可以是带图形界面的桌面程序或网页前端(如Upload Tool)。界面用于让用户选择固件文件、设置通信参数、启动升级过程并查看进度和结果。 -
实现通信与传输协议:Uploader必须实现与目标端Bootloader相匹配的通信协议,以确保数据传输的准确无误。
- 对于串口Bootloader,Uploader需要实现如Xmodem、Ymodem等协议。
- 对于网络Bootloader,Uploader则需要实现TFTP客户端、HTTP客户端等。
- 它负责将大块固件文件分割成协议规定格式的数据包(例如Xmodem的128字节数据块),并依次发送,同时根据接收端的应答(ACK/NAK)进行重传或续传。
-
握手与协调升级流程:在传输开始前,Uploader常需要与目标设备进行“握手”,以确认设备就绪、协议版本兼容,并获取设备信息。
- 例如,一个Web OTA的Uploader前端,会先通过HTTP GET请求获取设备版本和支持的功能列表(握手协议)。
- Uploader还负责协调整个升级会话,包括触发设备进入升级模式(如通过发送特定命令)、按顺序发送数据、在传输结束时发送结束标志,并等待设备确认。
-
处理传输逻辑与错误:管理整个文件传输过程,包括进度显示、超时重试、校验和验证,以及在中断后可能的断点续传能力。
3. Uploader与Bootloader的交互实例
一个清晰的例子是RT-Thread配合Webnet软件包实现的Web OTA:
- Uploader(网页前端):用户通过浏览器访问设备上的网页(Upload Tool界面),点击上传新固件。
- 握手:网页前端(作为Uploader)向设备后端发送一个
/cgi-bin/handshake请求,设备返回其支持的升级功能列表。 - 数据传输:网页前端通过HTTP POST请求,将固件二进制数据分块发送到设备指定的接口(如
/upload/app)。 - Bootloader(或设备端应用程序):设备端运行的程序(可能是应用本身,或一个常驻的服务)接收到数据,按照预先定义的规则(如本例中直接写入Flash的
download分区)存储固件。 - 后续动作:数据传输完成后,设备端程序返回成功响应。设备重启后,独立的Bootloader(如
up_bootloader)开始工作,执行上文所述的检查、校验、搬运流程,完成最终的固件更新。
总结与对比
| 特性 | Bootloader (引导程序) | Uploader (上传工具) |
|---|---|---|
| 运行位置 | 嵌入式设备内部(非易失性存储器)。 | 外部主机(如PC、服务器、手机)。 |
| 核心职责 | 设备端执行者:负责启动、验证、装载、更新设备自身固件。 | 数据发送与流程协调者:负责将固件数据从外部安全可靠地发送至设备。 |
| 主要功能 | 硬件初始化;启动决策;固件校验、解密、搬运、烧录;协议支持(接收端)。 | 用户交互;文件处理;协议支持(发送端);握手与会话管理。 |
| 交互关系 | 响应Uploader的指令,接收其发送的数据,并执行最终更新动作。 | 发起升级流程,向Bootloader发送指令和数据,驱动整个更新过程。 |
| 依赖关系 | 通常不依赖完整的操作系统,代码精简。 | 依赖于外部主机的操作系统和运行环境(如Python、浏览器)。 |
简而言之,Bootloader是设备上负责“启动”和“自我更新”的底层软件,而Uploader是外部用于“交付”新固件并“触发”更新过程的工具。二者通过约定的通信协议协同工作,共同构成了嵌入式系统固件更新机制的基础。在许多现代OTA方案中,设备端运行的主应用程序也可能承担一部分Uploader的功能(即接收网络数据并存入临时分区),而由独立的Bootloader完成最终的固件切换,从而实现更安全、可靠的升级体验。
分析PX4的部分源码
px4的bootloader获取:
git clone https://github.com/PX4/PX4-Bootloader.git
px4项目的获取
git clone https://github.com/PX4/PX4-Autopilot.git
其中,bootloader侧重点关注的bl.c、bl.h、hw_config.h、mian_f4.c等文件的代码,uploader侧重点关注px_uploader.py文件的代码。
Bootloader 和 Uploader 详细交互过程流程图
运行函数级流程图
Uploader (px_uploader.py) 端:
main()
├── parse_args()
├── setup_udp_to_gcs()
├── while True:
│ ├── scan_ports()
│ ├── for port in portlist:
│ │ ├── uploader.__init__()
│ │ ├── uploader.open()
│ │ ├── uploader.identify()
│ │ │ ├── __determineInterface()
│ │ │ ├── __sync()
│ │ │ ├── __getInfo(INFO_BL_REV)
│ │ │ ├── __getInfo(INFO_BOARD_ID)
│ │ │ ├── __getInfo(INFO_BOARD_REV)
│ │ │ └── __getInfo(INFO_FLASH_SIZE)
│ │ ├── uploader.upload()
│ │ │ ├── firmware.__init__()
│ │ │ ├── check_compatibility()
│ │ │ ├── __erase()
│ │ │ ├── __program()
│ │ │ │ └── 循环调用 __program_multi()
│ │ │ ├── __verify_v3() 或 __verify_v2()
│ │ │ └── __reboot()
│ │ └── uploader.close()
Bootloader (bl.c、main.c) 端:
main()
├── board_init()
├── clock_init()
├── check_force_bootloader_pin()
├── check_usb_connection()
├── if (should_boot_app):
│ └── jump_to_app()
├── bootloader(timeout)
│ ├── arch_systic_init()
│ ├── while True:
│ │ ├── cin_wait()
│ │ ├── switch(command):
│ │ │ ├── case PROTO_GET_SYNC:
│ │ │ │ └── sync_response()
│ │ │ ├── case PROTO_GET_DEVICE:
│ │ │ │ ├── 读取参数
│ │ │ │ └── 返回对应信息
│ │ │ ├── case PROTO_CHIP_ERASE:
│ │ │ │ ├── flash_func_erase_sector()
│ │ │ │ └── verify_erase()
│ │ │ ├── case PROTO_PROG_MULTI:
│ │ │ │ ├── 接收数据
│ │ │ │ ├── flash_func_write_word()
│ │ │ │ └── verify_write()
│ │ │ ├── case PROTO_GET_CRC:
│ │ │ │ └── 计算并返回CRC
│ │ │ └── case PROTO_BOOT:
│ │ │ └── 跳转到应用程序
│ │ └── timeout_handling()
Bootloader 协议结构和帧含义
1. 协议版本
- BL_PROTOCOL_VERSION = 5 (bl.c 中定义)
- Uploader 支持协议版本 2-5
2. 命令字节 (Command Bytes)
| 命令 | 值 | 描述 | 协议版本 |
|---|---|---|---|
| PROTO_GET_SYNC | 0x21 | 同步命令,用于建立连接 | v2+ |
| PROTO_GET_DEVICE | 0x22 | 获取设备信息 | v2+ |
| PROTO_CHIP_ERASE | 0x23 | 擦除芯片(应用程序区域) | v2+ |
| PROTO_PROG_MULTI | 0x27 | 编程多个字(4字节对齐) | v2+ |
| PROTO_GET_CRC | 0x29 | 获取CRC校验值 | v3+ |
| PROTO_GET_OTP | 0x2a | 读取OTP区域 | v4+ |
| PROTO_GET_SN | 0x2b | 读取序列号 | v4+ |
| PROTO_GET_CHIP | 0x2c | 获取芯片ID | v5+ |
| PROTO_SET_DELAY | 0x2d | 设置启动延迟 | v5+ |
| PROTO_GET_CHIP_DES | 0x2e | 获取芯片描述(ASCII) | v5+ |
| PROTO_BOOT | 0x30 | 启动应用程序 | v2+ |
| PROTO_DEBUG | 0x31 | 调试命令 | v2+ |
3. 响应字节 (Response Bytes)
| 响应 | 值 | 描述 |
|---|---|---|
| PROTO_INSYNC | 0x12 | 同步字节,每个响应前发送 |
| PROTO_OK | 0x10 | 操作成功 |
| PROTO_FAILED | 0x11 | 操作失败 |
| PROTO_INVALID | 0x13 | 无效命令(v3+) |
| PROTO_BAD_SILICON_REV | 0x14 | 硅版本有问题(v5+) |
4. 帧结构
命令帧格式:
<命令字节> [参数] <EOC(0x20)>
响应帧格式:
[数据] <INSYNC(0x12)> <状态字节>
5. 具体命令详解
5.1 GET_DEVICE 命令
命令格式: 0x22 <参数> 0x20
参数类型:
0x01: 获取bootloader协议版本0x02: 获取板卡ID0x03: 获取板卡版本0x04: 获取固件最大大小0x05: 获取保留向量区域
响应: <4字节数据> 0x12 <状态>
5.2 PROG_MULTI 命令
命令格式: 0x27 <长度字节> <数据字节> 0x20
- 长度字节: 数据字节数(必须是4的倍数)
- 最大长度: 252字节(bl.c中限制为64字节)
响应: 0x12 <状态>
5.3 状态机 (bl_state)
Bootloader 使用状态机确保正确的命令顺序:
#define STATE_PROTO_GET_SYNC 0x1
#define STATE_PROTO_GET_DEVICE 0x2
#define STATE_PROTO_CHIP_ERASE 0x4
#define STATE_PROTO_PROG_MULTI 0x8
#define STATE_PROTO_GET_CRC 0x10
#define STATE_PROTO_BOOT 0x200
状态转换要求:
- 擦除前必须:
GET_SYNC | GET_DEVICE - 重启前必须:
GET_SYNC | GET_DEVICE | PROG_MULTI | GET_CRC
6. 特殊处理
6.1 硅版本检查
- 在 STM32F4 芯片上检查硅版本
- 某些硅版本有硬件问题,需要限制Flash使用
- 通过
check_silicon()函数实现
6.2 启动延迟
- 支持设置启动延迟(秒)
- 存储在特定Flash地址
- 用于给地面站时间连接
6.3 安全引导
- 支持安全引导(SECURE_BTL_ENABLED)
- 使用TOC(表内容)管理多个应用
- 支持签名验证和加密
7. 通信流程示例
-
同步阶段:
Uploader: 0x21 0x20 Bootloader: 0x12 0x10 -
获取信息:
Uploader: 0x22 0x01 0x20 # 获取协议版本 Bootloader: 0x05 0x00 0x00 0x00 0x12 0x10 # 返回协议版本5 -
擦除芯片:
Uploader: 0x23 0x20 Bootloader: 0x12 0x10 # 擦除成功 -
编程数据:
Uploader: 0x27 0x04 0xAA 0xBB 0xCC 0xDD 0x20 # 编程4字节 Bootloader: 0x12 0x10 -
验证CRC:
Uploader: 0x29 0x20 Bootloader: 0x78 0x56 0x34 0x12 0x12 0x10 # 返回CRC -
启动应用:
Uploader: 0x30 0x20 Bootloader: 0x12 0x10
这个协议设计简单而健壮,适合在资源受限的嵌入式环境中使用,确保了固件更新的可靠性和安全性。
支持升级、回退的bootloader和uploader
一、设计思路
1. 核心需求
- 版本管理: 能够存储多个固件版本
- 版本回退: 可以从当前版本回退到之前的版本
- 版本验证: 确保回退的固件是完整可用的
- 安全存储: 防止固件损坏或丢失
2. 存储策略
Flash 布局:
+-----------------------+
| Bootloader (128KB) |
+-----------------------+
| Metadata (8KB) | <- 版本管理信息
+-----------------------+
| Version 3 (512KB) | <- 当前运行版本
+-----------------------+
| Version 2 (512KB) | <- 上一个版本
+-----------------------+
| Version 1 (512KB) | <- 历史版本
+-----------------------+
| Rollback Temp (512KB) | <- 回退临时存储
+-----------------------+
二、详细设计
1. 数据结构
1.1 固件版本元数据
// metadata.h
#pragma once
#include <stdint.h>
#include <stdbool.h>
#define MAX_VERSION_COUNT 3
#define VERSION_MAGIC 0x56455253 // "VERS"
typedef struct {
uint32_t magic; // 魔数,用于验证数据结构
uint32_t version; // 版本号 (主版本 << 24 | 次版本 << 16 | 修订版本)
uint32_t timestamp; // 编译时间戳
uint32_t crc32; // 固件CRC校验
uint32_t size; // 固件大小
uint32_t load_address; // 加载地址
uint32_t entry_point; // 入口地址
uint32_t rollback_count; // 回退次数计数
uint8_t board_id; // 板卡ID
uint8_t board_rev; // 板卡版本
uint8_t status; // 状态: 0=空, 1=有效, 2=当前运行, 3=损坏
uint8_t reserved; // 保留
} firmware_version_t;
typedef struct {
firmware_version_t versions[MAX_VERSION_COUNT];
uint8_t current_version_idx; // 当前运行版本的索引
uint8_t last_valid_version_idx; // 最后一个有效版本的索引
uint16_t update_counter; // 更新计数器
uint32_t metadata_crc; // 元数据CRC
} version_metadata_t;
1.2 扩展的 Bootloader 协议命令
// bl_protocol.h
#define PROTO_GET_VERSION_LIST 0x32 // 获取版本列表
#define PROTO_GET_VERSION_INFO 0x33 // 获取特定版本信息
#define PROTO_SET_ACTIVE_VERSION 0x34 // 设置活动版本
#define PROTO_ROLLBACK_VERSION 0x35 // 回退到指定版本
#define PROTO_ERASE_VERSION 0x36 // 擦除特定版本
#define PROTO_GET_METADATA 0x37 // 获取元数据信息
#define PROTO_BACKUP_CURRENT 0x38 // 备份当前版本
#define PROTO_VERIFY_VERSION 0x39 // 验证版本完整性
2. Bootloader 增强设计
2.1 版本管理模块
// version_manager.c
#include "metadata.h"
#include "flash_manager.h"
// 版本元数据存储地址
#define METADATA_ADDRESS 0x08020000
#define VERSION_BASE_ADDRESS 0x08020800
// 每个版本的空间大小
#define VERSION_SLOT_SIZE (512 * 1024) // 512KB
// 初始化版本管理
int version_manager_init(void) {
version_metadata_t metadata;
// 读取元数据
if (!flash_read(METADATA_ADDRESS, (uint8_t*)&metadata, sizeof(metadata))) {
return -1;
}
// 检查魔数
if (metadata.metadata_crc != calculate_crc32((uint8_t*)&metadata, sizeof(metadata) - 4)) {
// CRC校验失败,初始化元数据
init_metadata();
}
return 0;
}
// 获取当前活动版本
firmware_version_t* get_current_version(void) {
version_metadata_t metadata;
flash_read(METADATA_ADDRESS, (uint8_t*)&metadata, sizeof(metadata));
if (metadata.current_version_idx < MAX_VERSION_COUNT) {
return &metadata.versions[metadata.current_version_idx];
}
return NULL;
}
// 回退到指定版本
int rollback_to_version(uint8_t target_idx) {
version_metadata_t metadata;
// 读取元数据
flash_read(METADATA_ADDRESS, (uint8_t*)&metadata, sizeof(metadata));
// 检查目标版本是否有效
if (target_idx >= MAX_VERSION_COUNT ||
metadata.versions[target_idx].status != 1) {
return -1;
}
// 备份当前版本到临时存储
firmware_version_t* current = get_current_version();
if (current && current->status == 2) {
if (!backup_version(current, ROLLBACK_TEMP_ADDRESS)) {
return -2;
}
}
// 设置新活动版本
metadata.versions[target_idx].status = 2;
metadata.versions[target_idx].rollback_count++;
if (current) {
current->status = 1; // 标记为历史版本
}
metadata.current_version_idx = target_idx;
metadata.update_counter++;
// 保存元数据
metadata.metadata_crc = calculate_crc32((uint8_t*)&metadata, sizeof(metadata) - 4);
flash_erase_sector(METADATA_ADDRESS);
flash_write(METADATA_ADDRESS, (uint8_t*)&metadata, sizeof(metadata));
return 0;
}
// 验证固件完整性
bool verify_firmware_integrity(firmware_version_t* version) {
uint32_t calculated_crc = 0;
uint32_t address = VERSION_BASE_ADDRESS + (version->load_address * VERSION_SLOT_SIZE);
// 计算CRC
calculated_crc = calculate_crc32((uint8_t*)address, version->size);
return (calculated_crc == version->crc32);
}
2.2 扩展的 Bootloader 主循环
// bl_enhanced.c
#include "version_manager.h"
#include "bl_protocol.h"
void enhanced_bootloader(unsigned timeout) {
// 初始化版本管理
version_manager_init();
while (true) {
int cmd = get_command();
switch (cmd) {
case PROTO_GET_VERSION_LIST:
handle_get_version_list();
break;
case PROTO_GET_VERSION_INFO:
handle_get_version_info();
break;
case PROTO_SET_ACTIVE_VERSION:
handle_set_active_version();
break;
case PROTO_ROLLBACK_VERSION:
handle_rollback_version();
break;
case PROTO_ERASE_VERSION:
handle_erase_version();
break;
case PROTO_BACKUP_CURRENT:
handle_backup_current();
break;
case PROTO_VERIFY_VERSION:
handle_verify_version();
break;
// 原有命令处理保持不变
case PROTO_GET_SYNC:
case PROTO_GET_DEVICE:
// ... 其他原有命令
default:
send_invalid_response();
break;
}
}
}
// 处理获取版本列表命令
void handle_get_version_list(void) {
version_metadata_t metadata;
flash_read(METADATA_ADDRESS, (uint8_t*)&metadata, sizeof(metadata));
// 发送版本数量
send_response_byte(MAX_VERSION_COUNT);
// 发送每个版本的基本信息
for (int i = 0; i < MAX_VERSION_COUNT; i++) {
firmware_version_t* ver = &metadata.versions[i];
send_response_word(ver->version); // 版本号
send_response_word(ver->timestamp); // 时间戳
send_response_byte(ver->status); // 状态
send_response_byte(ver->rollback_count); // 回退次数
}
send_sync_response();
}
// 处理版本回退命令
void handle_rollback_version(void) {
uint8_t target_version = get_command_byte();
int result = rollback_to_version(target_version);
if (result == 0) {
send_sync_response();
// 延迟后重启
delay(100);
jump_to_app();
} else {
send_failure_response();
}
}
3. Uploader 增强设计
3.1 支持版本回退的 Uploader 类
# enhanced_uploader.py
import struct
import json
from enum import Enum
class VersionStatus(Enum):
EMPTY = 0
VALID = 1
ACTIVE = 2
CORRUPTED = 3
class EnhancedUploader(uploader):
"""支持版本回退的增强型上传器"""
# 扩展的命令字节
GET_VERSION_LIST = b'\x32'
GET_VERSION_INFO = b'\x33'
SET_ACTIVE_VERSION = b'\x34'
ROLLBACK_VERSION = b'\x35'
ERASE_VERSION = b'\x36'
GET_METADATA = b'\x37'
BACKUP_CURRENT = b'\x38'
VERIFY_VERSION = b'\x39'
def __init__(self, portname, baudrate_bootloader, baudrate_flightstack):
super().__init__(portname, baudrate_bootloader, baudrate_flightstack)
self.version_metadata = None
def get_version_list(self):
"""获取版本列表"""
self.__send(self.GET_VERSION_LIST + self.EOC)
version_count = ord(self.__recv())
versions = []
for i in range(version_count):
version_data = {
'index': i,
'version': self.__recv_int(),
'timestamp': self.__recv_int(),
'status': ord(self.__recv()),
'rollback_count': ord(self.__recv())
}
versions.append(version_data)
self.__getSync()
return versions
def get_version_info(self, version_idx):
"""获取特定版本的详细信息"""
self.__send(self.GET_VERSION_INFO +
struct.pack('B', version_idx) +
self.EOC)
info = {
'version': self.__recv_int(),
'timestamp': self.__recv_int(),
'crc32': self.__recv_int(),
'size': self.__recv_int(),
'board_id': ord(self.__recv()),
'board_rev': ord(self.__recv()),
'status': ord(self.__recv()),
'rollback_count': ord(self.__recv())
}
self.__getSync()
return info
def rollback_to_version(self, target_idx, force=False):
"""回退到指定版本"""
if not force:
# 获取目标版本信息
target_info = self.get_version_info(target_idx)
# 获取当前版本信息
versions = self.get_version_list()
current_version = next((v for v in versions if v['status'] == VersionStatus.ACTIVE.value), None)
if current_version:
current_info = self.get_version_info(current_version['index'])
print(f"当前版本: {self._format_version(current_info['version'])}")
print(f"目标版本: {self._format_version(target_info['version'])}")
# 检查是否真的是回退
if target_info['version'] >= current_info['version']:
print("警告:目标版本不比当前版本旧,这可能不是回退操作!")
confirm = input("确认要回退到这个版本吗?(y/N): ")
if confirm.lower() != 'y':
return False
# 执行回退
self.__send(self.ROLLBACK_VERSION +
struct.pack('B', target_idx) +
self.EOC)
try:
self.__getSync()
print(f"回退到版本 {target_idx} 成功!")
return True
except RuntimeError as e:
print(f"回退失败: {e}")
return False
def backup_current_version(self):
"""备份当前版本"""
self.__send(self.BACKUP_CURRENT + self.EOC)
try:
self.__getSync()
# 获取备份结果
backup_info = {
'backup_slot': ord(self.__recv()),
'backup_crc': self.__recv_int(),
'backup_size': self.__recv_int()
}
self.__getSync()
print(f"备份成功,存储在槽位 {backup_info['backup_slot']}")
return backup_info
except RuntimeError as e:
print(f"备份失败: {e}")
return None
def verify_version_integrity(self, version_idx):
"""验证版本完整性"""
self.__send(self.VERIFY_VERSION +
struct.pack('B', version_idx) +
self.EOC)
result = ord(self.__recv())
crc_match = (result & 0x01) != 0
size_match = (result & 0x02) != 0
magic_match = (result & 0x04) != 0
self.__getSync()
return {
'crc_match': crc_match,
'size_match': size_match,
'magic_match': magic_match,
'is_valid': crc_match and size_match and magic_match
}
def enhanced_upload(self, fw_path, keep_previous=True, backup_before=True):
"""增强的上传方法,支持版本管理"""
# 1. 备份当前版本(如果需要)
if backup_before:
print("备份当前版本...")
backup_info = self.backup_current_version()
if not backup_info:
print("警告:备份失败,继续操作可能有风险")
# 2. 检查可用的版本槽位
versions = self.get_version_list()
empty_slots = [v['index'] for v in versions if v['status'] == VersionStatus.EMPTY.value]
if not empty_slots and not keep_previous:
# 需要擦除一个旧版本
print("没有可用的版本槽位")
# 提示用户选择要覆盖的版本
for v in versions:
if v['status'] != VersionStatus.ACTIVE.value:
print(f"槽位 {v['index']}: 版本 {self._format_version(v['version'])}")
try:
slot_to_erase = int(input("请选择要覆盖的槽位: "))
self.erase_version(slot_to_erase)
except ValueError:
print("无效选择,使用第一个非活动槽位")
slot_to_erase = next(v['index'] for v in versions if v['status'] != VersionStatus.ACTIVE.value)
self.erase_version(slot_to_erase)
else:
slot_to_use = empty_slots[0] if empty_slots else None
# 3. 加载新固件
fw = firmware(fw_path)
# 4. 计算目标地址(基于槽位)
slot_offset = slot_to_use * self.VERSION_SLOT_SIZE
base_address = self.VERSION_BASE_ADDRESS + slot_offset
# 5. 执行编程
print(f"编程到槽位 {slot_to_use}...")
self.program_to_address(fw, base_address)
# 6. 更新元数据
self.update_version_metadata(slot_to_use, fw)
# 7. 设置为新活动版本
self.set_active_version(slot_to_use)
print("上传完成,新版本已激活")
def _format_version(self, version_int):
"""格式化版本号"""
major = (version_int >> 24) & 0xFF
minor = (version_int >> 16) & 0xFF
patch = version_int & 0xFFFF
return f"{major}.{minor}.{patch}"
def update_version_metadata(self, slot_idx, fw):
"""更新版本元数据"""
version_info = {
'version': self._parse_firmware_version(fw),
'timestamp': int(time.time()),
'crc32': fw.crc(),
'size': len(fw.image),
'board_id': fw.property('board_id'),
'board_rev': fw.property('board_revision'),
'status': VersionStatus.VALID.value,
'rollback_count': 0
}
# 这里需要实现与bootloader的元数据更新协议
# 实际实现可能需要多个命令调用
3.2 增强的 CLI 界面
# cli_enhanced.py
import argparse
import sys
from enhanced_uploader import EnhancedUploader
def main():
parser = argparse.ArgumentParser(description="支持版本回退的固件上传器")
subparsers = parser.add_subparsers(dest='command', help='命令')
# 上传命令
upload_parser = subparsers.add_parser('upload', help='上传新固件')
upload_parser.add_argument('--port', required=True, help='串口设备')
upload_parser.add_argument('--firmware', required=True, help='固件文件')
upload_parser.add_argument('--keep-previous', action='store_true', help='保留之前的版本')
upload_parser.add_argument('--no-backup', action='store_true', help='不备份当前版本')
# 列表命令
list_parser = subparsers.add_parser('list', help='列出所有版本')
list_parser.add_argument('--port', required=True, help='串口设备')
# 回退命令
rollback_parser = subparsers.add_parser('rollback', help='回退到指定版本')
rollback_parser.add_argument('--port', required=True, help='串口设备')
rollback_parser.add_argument('--version', type=int, required=True, help='版本索引')
rollback_parser.add_argument('--force', action='store_true', help='强制回退')
# 验证命令
verify_parser = subparsers.add_parser('verify', help='验证版本完整性')
verify_parser.add_argument('--port', required=True, help='串口设备')
verify_parser.add_argument('--version', type=int, help='版本索引(默认所有)')
# 备份命令
backup_parser = subparsers.add_parser('backup', help='备份当前版本')
backup_parser.add_argument('--port', required=True, help='串口设备')
args = parser.parse_args()
if args.command == 'upload':
upload_firmware(args)
elif args.command == 'list':
list_versions(args)
elif args.command == 'rollback':
rollback_version(args)
elif args.command == 'verify':
verify_versions(args)
elif args.command == 'backup':
backup_version(args)
else:
parser.print_help()
def upload_firmware(args):
"""上传新固件"""
up = EnhancedUploader(args.port, 115200, [57600])
try:
up.open()
up.identify()
print("设备信息:")
print(f" 板卡ID: {up.board_type}")
print(f" 板卡版本: {up.board_rev}")
print(f" Flash大小: {up.fw_maxsize}")
# 显示当前版本信息
versions = up.get_version_list()
print("\n当前存储的版本:")
for ver in versions:
status_str = {0: "空", 1: "有效", 2: "活动", 3: "损坏"}.get(ver['status'], "未知")
print(f" 槽位{ver['index']}: 版本{up._format_version(ver['version'])} [{status_str}]")
# 执行上传
up.enhanced_upload(
args.firmware,
keep_previous=args.keep_previous,
backup_before=not args.no_backup
)
except Exception as e:
print(f"错误: {e}")
sys.exit(1)
finally:
up.close()
def list_versions(args):
"""列出所有版本"""
up = EnhancedUploader(args.port, 115200, [57600])
try:
up.open()
up.identify()
versions = up.get_version_list()
print("版本列表:")
print("索引 | 版本号 | 状态 | 回退次数 | 时间戳")
print("-" * 60)
for ver in versions:
version_str = up._format_version(ver['version'])
status_str = {0: "空", 1: "有效", 2: "活动", 3: "损坏"}.get(ver['status'], "未知")
timestamp = time.strftime('%Y-%m-%d %H:%M', time.localtime(ver['timestamp']))
print(f"{ver['index']:4} | {version_str:12} | {status_str:4} | {ver['rollback_count']:8} | {timestamp}")
except Exception as e:
print(f"错误: {e}")
sys.exit(1)
finally:
up.close()
def rollback_version(args):
"""回退到指定版本"""
up = EnhancedUploader(args.port, 115200, [57600])
try:
up.open()
up.identify()
success = up.rollback_to_version(args.version, args.force)
if success:
print("回退成功!设备将重启...")
else:
print("回退失败")
sys.exit(1)
except Exception as e:
print(f"错误: {e}")
sys.exit(1)
finally:
up.close()
def verify_versions(args):
"""验证版本完整性"""
up = EnhancedUploader(args.port, 115200, [57600])
try:
up.open()
up.identify()
versions = up.get_version_list()
if args.version is not None:
versions = [v for v in versions if v['index'] == args.version]
print("版本验证结果:")
print("索引 | 版本号 | CRC | 大小 | 魔数 | 总体")
print("-" * 60)
for ver in versions:
if ver['status'] != 0: # 不是空槽位
result = up.verify_version_integrity(ver['index'])
version_str = up._format_version(ver['version'])
print(f"{ver['index']:4} | {version_str:12} | "
f"{'✓' if result['crc_match'] else '✗':5} | "
f"{'✓' if result['size_match'] else '✗':5} | "
f"{'✓' if result['magic_match'] else '✗':5} | "
f"{'有效' if result['is_valid'] else '无效':4}")
except Exception as e:
print(f"错误: {e}")
sys.exit(1)
finally:
up.close()
def backup_version(args):
"""备份当前版本"""
up = EnhancedUploader(args.port, 115200, [57600])
try:
up.open()
up.identify()
backup_info = up.backup_current_version()
if backup_info:
print(f"备份成功:")
print(f" 存储槽位: {backup_info['backup_slot']}")
print(f" 备份大小: {backup_info['backup_size']} 字节")
print(f" CRC32: 0x{backup_info['backup_crc']:08x}")
else:
print("备份失败")
except Exception as e:
print(f"错误: {e}")
sys.exit(1)
finally:
up.close()
if __name__ == '__main__':
main()
三、安全性和可靠性考虑
1. 原子性操作
- 所有元数据更新都是原子操作
- 使用CRC校验确保数据完整性
- 操作失败时自动回滚
2. 电源故障恢复
- 元数据中包含操作标记
- 重启时检查未完成的操作
- 自动恢复或清理损坏的数据
3. 版本验证
- 编程后自动验证CRC
- 启动前检查固件完整性
- 提供手动验证命令
四、使用示例
# 列出所有版本
python cli_enhanced.py list --port /dev/ttyACM0
# 上传新固件(自动备份当前版本)
python cli_enhanced.py upload --port /dev/ttyACM0 --firmware firmware_v1.2.0.px4
# 回退到版本1
python cli_enhanced.py rollback --port /dev/ttyACM0 --version 1
# 验证所有版本完整性
python cli_enhanced.py verify --port /dev/ttyACM0
# 备份当前版本
python cli_enhanced.py backup --port /dev/ttyACM0
五、总结
这个设计提供了完整的版本管理功能:
- 多版本存储: 支持存储多个固件版本
- 安全回退: 可以从当前版本回退到任何有效的历史版本
- 完整性验证: 所有版本都经过CRC验证
- 操作安全: 关键操作都有确认提示
- 易于使用: 提供直观的CLI界面
这个设计可以显著提高固件更新的可靠性,特别是在需要快速回退到稳定版本的场景中。
更多推荐



所有评论(0)