保姆级教程:用OpenDDS C++ 从IDL定义到RTPS通信,手把手搭建你的第一个分布式消息系统
·
从零构建OpenDDS分布式消息系统:IDL定义到RTPS通信实战指南
1. 环境准备与工具链配置
在开始OpenDDS开发前,需要搭建完整的工具链环境。推荐使用Ubuntu 20.04 LTS或更高版本作为开发平台,以下是必备组件:
# 安装基础编译工具
sudo apt update && sudo apt install -y build-essential cmake git
# 安装ACE+TAO依赖
sudo apt install -y libssl-dev libxml2-dev
OpenDDS的核心依赖包括:
- ACE+TAO :提供跨平台网络抽象层
- MPC :Make Project Creator构建工具
- OpenDDS :DDS实现本体
建议使用源码编译方式安装:
git clone https://github.com/DOCGroup/ACE_TAO.git
cd ACE_TAO/ACE
./configure --prefix=/opt/ace
make -j$(nproc)
sudo make install
export ACE_ROOT=/opt/ace
export TAO_ROOT=$ACE_ROOT/TAO
export LD_LIBRARY_PATH=$ACE_ROOT/lib:$LD_LIBRARY_PATH
编译OpenDDS时需特别注意:
git clone https://github.com/OpenDDS/OpenDDS.git
cd OpenDDS
./configure --prefix=/opt/opendds
make -j$(nproc)
sudo make install
export DDS_ROOT=/opt/opendds
export PATH=$DDS_ROOT/bin:$PATH
提示:Windows环境下建议使用Visual Studio 2019或更高版本,需额外配置ACE_TAO的环境变量
2. IDL文件设计与编译
2.1 消息类型定义
创建 Messenger.idl 定义分布式消息结构:
module Messenger {
@topic
struct Message {
@key long subject_id; // 消息唯一标识
string from; // 发送者标识
string subject; // 消息主题
string text; // 消息内容
long count; // 序列号
};
};
关键设计要点:
@topic注解标记可传输的数据类型@key指定实例标识字段- 支持嵌套结构和复杂类型
2.2 IDL编译流程
使用OpenDDS提供的编译工具链:
# 生成类型支持代码
opendds_idl Messenger.idl
tao_idl Messenger.idl
tao_idl MessengerTypeSupport.idl
# 查看生成文件
ls -l Messenger*.*
生成的关键文件包括:
| 文件类型 | 用途 |
|---|---|
| MessengerTypeSupportImpl.h/cpp | 类型支持实现 |
| MessengerC.h/cpp | TAO类型定义 |
| MessengerTypeSupportC.h/cpp | DDS类型支持 |
注意:实际项目中建议通过CMake或MPC管理编译流程
3. 发布者实现详解
3.1 参与者初始化
创建 publisher.cpp 实现消息发布:
#include <dds/DCPS/Service_Participant.h>
#include "MessengerTypeSupportImpl.h"
int main(int argc, char* argv[]) {
// 初始化DDS域
DDS::DomainParticipantFactory_var dpf =
TheParticipantFactoryWithArgs(argc, argv);
DDS::DomainParticipant_var participant =
dpf->create_participant(42, PARTICIPANT_QOS_DEFAULT, 0, 0);
// 注册消息类型
Messenger::MessageTypeSupport_var ts =
new Messenger::MessageTypeSupportImpl;
ts->register_type(participant, "");
// 创建Topic
DDS::Topic_var topic = participant->create_topic(
"ChatRoom",
ts->get_type_name(),
TOPIC_QOS_DEFAULT, 0, 0);
}
3.2 数据写入实现
配置发布者和数据写入器:
// 创建Publisher
DDS::Publisher_var pub = participant->create_publisher(
PUBLISHER_QOS_DEFAULT, 0, 0);
// 创建DataWriter
DDS::DataWriter_var writer = pub->create_datawriter(
topic, DATAWRITER_QOS_DEFAULT, 0, 0);
// 窄化到具体类型
Messenger::MessageDataWriter_var message_writer =
Messenger::MessageDataWriter::_narrow(writer);
// 等待订阅者连接
DDS::StatusCondition_var condition = writer->get_statuscondition();
condition->set_enabled_statuses(DDS::PUBLICATION_MATCHED_STATUS);
DDS::WaitSet_var ws = new DDS::WaitSet;
ws->attach_condition(condition);
while (true) {
DDS::PublicationMatchedStatus matches;
writer->get_publication_matched_status(matches);
if (matches.current_count >= 1) break;
// 超时处理...
}
3.3 消息发布循环
实现周期性消息发布:
Messenger::Message message;
message.subject_id = 1;
message.from = "Publisher";
message.subject = "Greeting";
message.text = "Hello DDS!";
message.count = 0;
for (int i = 0; i < 10; ++i) {
message_writer->write(message, DDS::HANDLE_NIL);
++message.count;
++message.subject_id;
ACE_OS::sleep(1); // 每秒发布一次
}
4. 订阅者实现与监听器
4.1 数据监听器设计
创建 DataReaderListenerImpl.h :
class DataReaderListenerImpl : public virtual DDS::DataReaderListener {
public:
void on_data_available(DDS::DataReader_ptr reader) override {
Messenger::MessageDataReader_var reader_i =
Messenger::MessageDataReader::_narrow(reader);
Messenger::Message message;
DDS::SampleInfo si;
while (reader_i->take_next_sample(message, si) == DDS::RETCODE_OK) {
if (si.valid_data) {
std::cout << "Received: " << message.text.in()
<< " Count: " << message.count << std::endl;
}
}
}
// 其他回调方法保持默认实现...
};
4.2 订阅者主流程
创建 subscriber.cpp :
#include "DataReaderListenerImpl.h"
int main(int argc, char* argv[]) {
// 初始化与发布者相同的域
DDS::DomainParticipant_var participant = ...;
// 创建Subscriber
DDS::Subscriber_var sub = participant->create_subscriber(
SUBSCRIBER_QOS_DEFAULT, 0, 0);
// 创建DataReader并设置监听器
DDS::DataReaderListener_var listener(new DataReaderListenerImpl);
DDS::DataReader_var reader = sub->create_datareader(
topic, DATAREADER_QOS_DEFAULT, listener, 0);
// 主循环保持运行
ACE_OS::sleep(30); // 运行30秒
return 0;
}
5. RTPS通信配置实战
5.1 发现配置
创建 rtps.ini 配置文件:
[common]
DCPSGlobalTransportConfig=$file
DCPSDefaultDiscovery=DEFAULT_RTPS
[transport/the_rtps_transport]
transport_type=rtps_udp
关键参数说明:
DCPSDefaultDiscovery:指定RTPS发现机制transport_type:设置UDP作为底层传输
5.2 运行带RTPS的系统
分别启动发布者和订阅者:
# 终端1 - 订阅者
./subscriber -DCPSConfigFile rtps.ini
# 终端2 - 发布者
./publisher -DCPSConfigFile rtps.ini
验证RTPS通信的关键指标:
- 使用Wireshark捕获
udp.port == 7400-7500的流量 - 观察SPDP和SEDP发现报文
- 确认用户数据报文传输
6. 高级配置与性能优化
6.1 QoS策略调整
修改 DATAWRITER_QOS_DEFAULT :
DDS::DataWriterQos qos;
pub->get_default_datawriter_qos(qos);
// 可靠性配置
qos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
qos.reliability.max_blocking_time.sec = 1;
// 历史深度设置
qos.history.kind = DDS::KEEP_LAST_HISTORY_QOS;
qos.history.depth = 10;
DDS::DataWriter_var writer = pub->create_datawriter(topic, qos, 0, 0);
6.2 多线程处理
提升吞吐量的关键配置:
// 设置多线程策略
DDS::DomainParticipantQos part_qos;
dpf->get_default_participant_qos(part_qos);
part_qos.entity_factory.autoenable_created_entities = true;
// 创建带线程池的参与者
DDS::DomainParticipant_var participant =
dpf->create_participant(42, part_qos, 0, 0);
6.3 性能监控
添加统计收集模块:
#include <dds/DCPS/transport/framework/TransportRegistry.h>
// 启用传输统计
OpenDDS::DCPS::TransportConfig_rch config =
TheTransportRegistry->get_config("rtps_config");
config->count_messages(true);
config->count_bytes(true);
// 定期输出统计信息
TheTransportRegistry->dump_stats();
7. 典型问题排查指南
7.1 编译问题
常见错误及解决方案:
- TAO_IDL编译失败 :检查环境变量
ACE_ROOT设置 - 链接错误 :确认链接顺序
-lOpenDDS_Dcps -lTAO -lACE - 类型支持缺失 :验证IDL文件是否正确定义
@topic
7.2 运行时问题
诊断工具使用示例:
# 查看DDS实体关系
opendds-info -DCPSInfoRepo corbaloc::localhost:12345/DCPSInfoRepo
# 启用调试日志
export DDS_debug_level=10
./publisher
7.3 网络配置
RTPS通信的关键检查点:
- 确认多播地址
239.255.0.1可达 - 检查防火墙放行UDP端口7400-7500
- 验证网络MTU设置避免分片
更多推荐
所有评论(0)