OpenDDS 跨主机通信配置与实现(C++和Java)
。本文将指导读者如何准备两台主机的环境,包括配置主机的 IP 地址、端口设置、域参与者列表等,并设置正确的 QoS 策略以确保通信的稳定和高效。通过本文,读者将了解到在 OpenDDS 中跨主机通信的必要步骤和注意事项,从而能够成功实现两台主机之间的数据交换和通信。
目录
1、编写一个示例
1.1、IDL接口定义
假设我们现在有以下结构:
struct MessagerOne
{
int subject_id;
string time;
string text;
int index;
double start Counter;
double frequency;
};
struct Heartbeat
{
int use_id;
int index;
};
那么我们将其编写为idl格式,文件名为DDSData.h,文件内容为:
module DDSData{
@topic
struct MessagerOne
{
@key long subject_id;
string time;
string text;
long index;
double startCounter;
double frequency;
};
@topic
struct Heartbeat
{
@key long use_id;
long index;
};
};
然后生成DDSData _Export.h文件: 此处的名称规则为DDSData _Export.h
perl D:\OpenDDS\ACE_wrappers\bin\generate_export_file.pl DDSData > DDSData_Export.h
生成的文件内容:
1.2、MPC文件介绍
dcps 项目就是用来生成动态库的,dcps_java项目是用于生成jar包的,另外俩个项目就是一个简易的订阅/发布例子。
这里缺少 Publisher.cpp、 Subscriber.cpp、DataReaderListenerImpl.cpp、DataReaderListenerImpl.h四个文件,后面会贴出代码。
project(*idl): dcps {
requires += no_opendds_safety_profile
TypeSupport_Files {
DDSData.idl
}
custom_only = 1
}
project(*java): dcps_java {
idlflags += -Wb,stub_export_include=DDSData_Export.h \
-Wb,stub_export_macro=DDSData_Export
dcps_ts_flags += -Wb,export_export_include=DDSData_Export.h \
-Wb,export_export_macro=DDSData_Export
idl2jniflags += -Wb,stub_export_include=DDSData_Export.h \
-Wb,stub_export_macro=DDSData_Export
dynamicflags += DDSDATA_BUILD_DLL
TypeSupport_Files {
DDSData.idl
}
specific{
jarname = DDSData
}
Source_Files {
}
}
project(*publisher) : dcpsexe, dcps_tcp, dcps_rtps_udp {
requires += no_opendds_safety_profile
exename = publisher
after += *idl
TypeSupport_Files {
DDSData.idl
}
Source_Files {
Publisher.cpp
}
}
project(*subscriber) : dcpsexe, dcps_tcp, dcps_rtps_udp {
requires += no_opendds_safety_profile
exename = subscriber
after += *publisher
TypeSupport_Files {
DDSData.idl
}
Source_Files {
DataReaderListenerImpl.cpp
Subscriber.cpp
}
}
Publisher.cpp
#include "DDSData_TS.hpp"
#include "ace/Log_Msg.h"
#include "ace/OS_NS_unistd.h"
#ifdef ACE_AS_STATIC_LIBS
# include "dds/DCPS/RTPS/RtpsDiscovery.h"
# include "dds/DCPS/transport/rtps_udp/RtpsUdp.h"
#endif
// FUZZ: disable check_for_improper_main_declaration
int main(int, char*[])
{
// Initialize the TS interface
FACE::RETURN_CODE_TYPE status;
FACE::TS::Initialize("face_config.ini", status);
if (status != FACE::RC_NO_ERROR) {
return static_cast<int>(status);
}
// Create the pub connection
FACE::CONNECTION_ID_TYPE connId;
FACE::CONNECTION_DIRECTION_TYPE dir;
FACE::MESSAGE_SIZE_TYPE max_msg_size;
FACE::TS::Create_Connection(
"pub", FACE::PUB_SUB, connId, dir,
max_msg_size, FACE::INF_TIME_VALUE, status);
if (status != FACE::RC_NO_ERROR) {
return static_cast<int>(status);
}
// Message to send
Messenger::Message msg;
msg.text = "Hello, World!";
msg.subject_id = 14;
msg.count = 1;
// Send message
FACE::TRANSACTION_ID_TYPE txn;
ACE_DEBUG((LM_INFO, "Publisher: about to Send_Message()\n"));
FACE::TS::Send_Message(
connId, FACE::INF_TIME_VALUE, txn, msg,
max_msg_size, status);
if (status != FACE::RC_NO_ERROR) {
return static_cast<int>(status);
}
// Give message time to be processed before exiting
ACE_OS::sleep(15);
// Destroy the pub connection
FACE::TS::Destroy_Connection(connId, status);
if (status != FACE::RC_NO_ERROR) {
return static_cast<int>(status);
}
return EXIT_SUCCESS;
}
// FUZZ: enable check_for_improper_main_declaration
Subscriber.cpp
#include "DDSData_TS.hpp"
#include "ace/Log_Msg.h"
#ifdef ACE_AS_STATIC_LIBS
# include "dds/DCPS/RTPS/RtpsDiscovery.h"
# include "dds/DCPS/transport/rtps_udp/RtpsUdp.h"
#endif
// FUZZ: disable check_for_improper_main_declaration
int main(int, char*[])
{
// Initialize the TS interface
FACE::RETURN_CODE_TYPE status;
FACE::TS::Initialize("face_config.ini", status);
if (status != FACE::RC_NO_ERROR) {
return static_cast<int>(status);
}
// Create the sub connection
FACE::CONNECTION_ID_TYPE connId;
FACE::CONNECTION_DIRECTION_TYPE dir;
FACE::MESSAGE_SIZE_TYPE max_msg_size;
FACE::TS::Create_Connection(
"sub", FACE::PUB_SUB, connId, dir,
max_msg_size, FACE::INF_TIME_VALUE, status);
if (status != FACE::RC_NO_ERROR) {
return static_cast<int>(status);
}
// Receive a message
const FACE::TIMEOUT_TYPE timeout = FACE::INF_TIME_VALUE;
FACE::TRANSACTION_ID_TYPE txn;
Messenger::Message msg;
ACE_DEBUG((
LM_INFO, "Subscriber: about to Receive_Message()\n"));
FACE::TS::Receive_Message(
connId, timeout, txn, msg, max_msg_size, status);
if (status != FACE::RC_NO_ERROR) {
return static_cast<int>(status);
}
// Output the message
ACE_DEBUG((LM_INFO, "%C\t%d\n", msg.text.in(), msg.count));
// Destroy the sub connection
FACE::TS::Destroy_Connection(connId, status);
if (status != FACE::RC_NO_ERROR) {
return static_cast<int>(status);
}
return EXIT_SUCCESS;
}
// FUZZ: enable check_for_improper_main_declaration
DataReaderListenerImpl.h
#ifndef DATAREADER_LISTENER_IMPL_H
#define DATAREADER_LISTENER_IMPL_H
#include <ace/Global_Macros.h>
#include <dds/DdsDcpsSubscriptionC.h>
#include <dds/DCPS/LocalObject.h>
#include <dds/DCPS/Definitions.h>
class DataReaderListenerImpl
: public virtual OpenDDS::DCPS::LocalObject<DDS::DataReaderListener> {
public:
virtual void on_requested_deadline_missed(
DDS::DataReader_ptr reader,
const DDS::RequestedDeadlineMissedStatus& status);
virtual void on_requested_incompatible_qos(
DDS::DataReader_ptr reader,
const DDS::RequestedIncompatibleQosStatus& status);
virtual void on_sample_rejected(
DDS::DataReader_ptr reader,
const DDS::SampleRejectedStatus& status);
virtual void on_liveliness_changed(
DDS::DataReader_ptr reader,
const DDS::LivelinessChangedStatus& status);
virtual void on_data_available(
DDS::DataReader_ptr reader);
virtual void on_subscription_matched(
DDS::DataReader_ptr reader,
const DDS::SubscriptionMatchedStatus& status);
virtual void on_sample_lost(
DDS::DataReader_ptr reader,
const DDS::SampleLostStatus& status);
};
#endif /* DATAREADER_LISTENER_IMPL_H */
DataReaderListenerImpl.cpp
#include <ace/Log_Msg.h>
#include <ace/OS_NS_stdlib.h>
#include "DataReaderListenerImpl.h"
#include "MessengerTypeSupportC.h"
#include "MessengerTypeSupportImpl.h"
#include <iostream>
void
DataReaderListenerImpl::on_requested_deadline_missed(
DDS::DataReader_ptr /*reader*/,
const DDS::RequestedDeadlineMissedStatus& /*status*/)
{
}
void
DataReaderListenerImpl::on_requested_incompatible_qos(
DDS::DataReader_ptr /*reader*/,
const DDS::RequestedIncompatibleQosStatus& /*status*/)
{
}
void
DataReaderListenerImpl::on_sample_rejected(
DDS::DataReader_ptr /*reader*/,
const DDS::SampleRejectedStatus& /*status*/)
{
}
void
DataReaderListenerImpl::on_liveliness_changed(
DDS::DataReader_ptr /*reader*/,
const DDS::LivelinessChangedStatus& /*status*/)
{
}
void
DataReaderListenerImpl::on_data_available(DDS::DataReader_ptr reader)
{
Messenger::MessageDataReader_var reader_i =
Messenger::MessageDataReader::_narrow(reader);
if (!reader_i) {
ACE_ERROR((LM_ERROR,
ACE_TEXT("ERROR: %N:%l: on_data_available() -")
ACE_TEXT(" _narrow failed!\n")));
ACE_OS::exit(1);
}
Messenger::Message message;
DDS::SampleInfo info;
const DDS::ReturnCode_t error = reader_i->take_next_sample(message, info);
if (error == DDS::RETCODE_OK) {
std::cout << "SampleInfo.sample_rank = " << info.sample_rank << std::endl;
std::cout << "SampleInfo.instance_state = " << OpenDDS::DCPS::InstanceState::instance_state_mask_string(info.instance_state) << std::endl;
if (info.valid_data) {
// 数据的输出 取决于你的IDL定义
std::cout << "Message: subject = " << message.subject.in() << std::endl
<< " subject_id = " << message.subject_id << std::endl
<< " from = " << message.from.in() << std::endl
<< " count = " << message.count << std::endl
<< " text = " << message.text.in() << std::endl;
}
} else {
ACE_ERROR((LM_ERROR,
ACE_TEXT("ERROR: %N:%l: on_data_available() -")
ACE_TEXT(" take_next_sample failed!\n")));
}
}
void
DataReaderListenerImpl::on_subscription_matched(
DDS::DataReader_ptr /*reader*/,
const DDS::SubscriptionMatchedStatus& /*status*/)
{
}
void
DataReaderListenerImpl::on_sample_lost(
DDS::DataReader_ptr /*reader*/,
const DDS::SampleLostStatus& /*status*/)
{
}
1.3、生成解决方案
通过mwc生成vs解决方案
perl D:\OpenDDS\ACE_wrappers\bin\mwc.pl -type vs2019 -features java=1
生成后如下图所示:(文件名称忽略掉,不是一个工程)
打开*.sln,编译Messenger_Idl工程,无报错说明C++端的已生成成功。编译Messenger_Java工程,无报错说明java端的已生成成功。
2、通讯测试
2.1、使用repo server 通讯
运行程序DCPSInfoRepo,subscriber,publisher;
先在A机器上运行DCPSInfoRepo和subscriber;然后在B机器上运行publisher。
(1)A机器上先运行&DDS_ROOT/bin/DCPSInfoRepo-ORBDebugLevel 10 -ORBLogFile DCPSInfoRepo.log -o repo.ior;
(2)拷贝A机器上repo.ior文件到B机器上;
(3)A机器上运行./subscriber -ORBDebugLevel 10 -DCPSDebugLevel 10 – DCPSTransportDebugLevel 6 -ORBLogFile subscriber.log
(4)B机器上运行./publisher -ORBDebugLevel 10 -DCPSDebugLevel 10 – ORBLogFile publisher.log
注意事项:
(1) DCPSInfoRepo生成的repo.ior文件必须存放在subscriber和publisher工程文件目录下;
(2) 必须先运行DCPSInfoRepo,然后拷贝repo.ior文件到其他机器上;
rtps.ini文件内容(DCPSInfoRepo后面的路径为你的实际路径)
[common]
DCPSInfoRepo=file://D:\OpenDDS\DevGuideExamples\DCPS\Messenger\repo.ior
[transport/1]
transport_type=tcp
2.2、使用repo ip+port方式
运行程序DCPSInfoRepo,subscriber,publisher;
先在A机器上运行DCPSInfoRepo和subscriber;然后在B机器上运行publisher。
DCPSInfoRepo:
DCPSInfoRepo -ORBListenEndpoints iiop://192.168.175.135:12345 -ORBDebugLevel 10 -ORBLogFile DCPSInfoRepo.log
subscriber订阅:
.\subscriber.exe -DCPSConfigFile rtps.ini
publisher发布:
.\publisher.exe -DCPSConfigFile rtps.ini
rtps.ini文件内容
[common]
DCPSDebugLevel=0
DCPSInfoRepo=corbaloc::192.168.0.11:12345/DCPSInfoRepo
DCPSChunks=20
DCPSChunkAssociationMutltiplier=10
DCPSLivelinessFactor=80
DCPSBit=0
DCPSGlobalTransportConfig=$file
[domain/42]
DiscoveryConfig=uni_rtps
[rtps_discovery/uni_rtps]
SedpMulticast=0
ResendPeriod=2
[transport/the_rtps_transport]
transport_type=rtps_udp
use_multicast=0
[topic/Message]
platform_view_guid=103
type_name=DDSData::MessageOne
max_message_size=300
2.3、对等发现face
运行程序subscriber,publisher;
先在A机器上运行subscriber;然后在B机器上运行publisher。
rtps.ini文件内容
[common]
DCPSGlobalTransportConfig=$file
DCPSDefaultDiscovery=DEFAULT_RTPS
[domain/3]
DiscoveryConfig=uni_rtps
[rtps_discovery/uni_rtps]
SedpMulticast=0
ResendPeriod=2
[transport/the_rtps_transport]
transport_type=rtps_udp
use_multicast=0
[topic/Message]
platform_view_guid=103
type_name=DDSData::MessageOne
max_message_size=300
关于Java端的代码具体可参考OpenDDS\java\tests\messenger下的demo。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)