基于C++17的IMU数据生产者-消费者模型的实现
·
背景
IMU传感器以高频(如500Hz)向外发布包含 seqid(序列号)、timestamp(时间戳)以及姿态数据(Roll, Pitch, Yaw)的数据包。系统中有两个消费者节点:
- 导航节点(Navigation Node): 负责定位。它追求实时性,只关心当前最新的姿态,允许中间有少量丢包,不需要处理历史数据。
- 感知节点(Perception Node): 负责视觉惯性里程计(VIO)。它追求数据完整性,要求IMU数据必须严格连续(
seqid连续)。如果检测到丢包,必须拒绝该帧数据并触发重同步,否则会导致视觉追踪失败。
需求
请设计一个线程安全的 ImuManager 类,实现以下功能:
- 能够接收并缓存最新的IMU数据包(模拟从底层驱动获取)。
- 为导航节点提供获取最新数据的接口。
- 为感知节点提供带有
seqid连续性校验的获取接口。 - 保证多线程环境下的数据安全,且尽量降低对高频数据更新的阻塞。
💡 实现
设计思路
- 读写锁(
std::shared_mutex): IMU更新频率极高,而消费者读取频率相对较低。使用读写锁可以让多个消费者同时读取,只有在底层更新数据时才加独占锁,极大地提升了并发性能。 - 差异化返回: 导航节点直接拿数据;感知节点通过
std::optional返回,如果校验失败(丢包),则返回空值。
#include <iostream> |
|
#include <thread> |
|
#include <mutex> |
|
#include <shared_mutex> // C++17 读写锁 |
|
#include <optional> // C++17 可选值 |
|
#include <chrono> |
|
// 1. IMU 数据结构 |
|
struct ImuData { |
|
int seqid; // 序列号 |
|
double timestamp; // 时间戳 |
|
double roll, pitch, yaw; // 姿态角 |
|
}; |
|
// 2. IMU 数据管理器 |
|
class ImuManager { |
|
public: |
|
ImuManager() : last_seq_id_(-1) {} |
|
// 【生产者调用】模拟底层驱动推送最新的IMU数据 |
|
void updateData(const ImuData& new_data) { |
|
// 使用独占锁(写锁),因为要修改共享数据 |
|
std::unique_lock<std::shared_mutex> lock(mutex_); |
|
latest_data_ = new_data; |
|
} |
|
// 【导航节点调用】获取最新数据,容忍丢包,追求实时 |
|
ImuData getLatestData() { |
|
// 使用共享锁(读锁),允许多个消费者同时读取 |
|
std::shared_lock<std::shared_mutex> lock(mutex_); |
|
return latest_data_; |
|
} |
|
// 【感知节点调用】获取连续数据,校验 seqid,如果丢包则返回空 |
|
std::optional<ImuData> getValidatedData() { |
|
std::shared_lock<std::shared_mutex> lock(mutex_); |
|
// 处理初始状态 |
|
if (-1 == last_seq_id_) { |
|
last_seq_id_ = latest_data_.seqid; |
|
return latest_data_; |
|
} |
|
// 强转为 unsigned int 处理 int 溢出 |
|
if (static_cast<unsigned int>(latest_data_.seqid - last_seq_id_) != 1) { |
|
std::cerr << "[感知节点警告] 检测到丢包! 期望seq: " << last_seq_id_ + 1 |
|
<< ", 实际收到seq: " << latest_data_.seqid << ",拒绝该帧数据!\n"; |
|
return std::nullopt; |
|
} |
|
// 数据连续,更新本地记录的 seqid 并返回数据 |
|
last_seq_id_ = latest_data_.seqid; |
|
return latest_data_; |
|
} |
|
private: |
|
mutable std::shared_mutex mutex_; // 读写锁,保护 latest_data_ |
|
ImuData latest_data_; // 缓存的最新IMU数据 |
|
int last_seq_id_; // 记录感知节点上一次成功消费的 seqid |
|
}; |
|
// 3. 模拟导航节点(要求最新,容忍丢包) |
|
void navigationNode(ImuManager& manager) { |
|
while (true) { |
|
ImuData data = manager.getLatestData(); |
|
// 导航算法直接接收最新观测值,即使中间丢了几个包也能通过EKF协方差自适应 |
|
std::cout << "[导航节点] 融合最新IMU -> seq: " << data.seqid |
|
<< ", Yaw: " << data.yaw << "\n"; |
|
std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 模拟100Hz运行 |
|
} |
|
} |
|
// 4. 模拟感知节点(要求严格连续,丢包则报警/重同步) |
|
void perceptionNode(ImuManager& manager) { |
|
while (true) { |
|
auto data_opt = manager.getValidatedData(); |
|
if (data_opt) { |
|
// 数据连续,进行VIO视觉惯性里程计计算 |
|
ImuData data = data_opt.value(); |
|
std::cout << "[感知节点] VIO计算正常 -> seq: " << data.seqid << "\n"; |
|
} |
|
else { |
|
// 数据不连续,触发重同步逻辑(例如:重置光流追踪器或等待下一个关键帧) |
|
std::cout << "[感知节点] 触发重同步机制...\n"; |
|
} |
|
std::this_thread::sleep_for(std::chrono::milliseconds(33)); // 模拟30Hz运行(与相机同步) |
|
} |
|
} |
|
// 5. 模拟底层驱动(生产者) |
|
void mockImuDriver(ImuManager& manager) { |
|
int seq = 0; |
|
while (true) { |
|
ImuData data; |
|
data.seqid = seq++; |
|
data.timestamp = std::chrono::duration_cast<std::chrono::microseconds>( |
|
std::chrono::steady_clock::now().time_since_epoch()).count(); |
|
data.roll = 0.1; |
|
data.pitch = 0.2; |
|
data.yaw = (seq % 360) * 1.0; |
|
manager.updateData(data); |
|
// 模拟偶尔丢包(例如每15帧丢一次,实际串口读取中可能发生) |
|
if (0 == seq % 15) { |
|
std::cout << "--- [底层驱动模拟] 发生一次丢包 ---\n"; |
|
++seq; // 跳过一個 seqid |
|
} |
|
std::this_thread::sleep_for(std::chrono::milliseconds(2)); // 模拟500Hz高频输出 |
|
} |
|
} |
|
int main() { |
|
ImuManager imu; |
|
// 启动生产者(模拟驱动)和两个消费者线程 |
|
std::thread driver_thread(mockImuDriver, std::ref(imu)); |
|
std::thread nav_thread(navigationNode, std::ref(imu)); |
|
std::thread perc_thread(perceptionNode, std::ref(imu)); |
|
driver_thread.join(); |
|
nav_thread.join(); |
|
perc_thread.join(); |
|
return 0; |
|
} |
🔑 总结
- 读写锁(
std::shared_mutex)的应用:
相比于普通的std::mutex,读写锁非常适合这种“单生产者-多消费者”且“读多写少”(IMU更新极快,但消费者读取相对较慢)的场景。它能让导航节点和感知节点同时读取数据,互不阻塞,只有在驱动层更新latest_data_的瞬间才会加锁。 std::optional的使用:
在 C++17 中,使用std::optional来返回可能失败的结果是非常现代且安全的做法。它明确地告诉调用者:“这个函数不一定有返回值”,比传统的“返回 bool + 引用传参”或者“返回 -1 等错误码”更加清晰。- 业务逻辑的解耦:
代码清晰地展示了导航和感知对数据质量的不同容忍度。
更多推荐

所有评论(0)