1、编写一个示例

1.1、IDL接口定义

假设我们现在有以下结构:

struct MessagerOne
{
int subject_id;
string time;
string text;
int index;
double start Counter;
double frequency;
 };
struct Heartbeat
{
	int use_id;
	int index;
  };

那么我们将其编写为idl格式,文件名为DDSData.h,文件内容为:

module DDSData{
  @topic
  struct MessagerOne
  {
	@key long subject_id;
	string time;
	string text;
	long index;
	double startCounter;
	double frequency;
  };	
  @topic
  struct Heartbeat
  {
	@key long use_id;
	long index;
  };
};

然后生成DDSData _Export.h文件: 此处的名称规则为DDSData _Export.h

perl D:\OpenDDS\ACE_wrappers\bin\generate_export_file.pl DDSData > DDSData_Export.h

生成的文件内容:
在这里插入图片描述

1.2、MPC文件介绍

dcps 项目就是用来生成动态库的,dcps_java项目是用于生成jar包的,另外俩个项目就是一个简易的订阅/发布例子。
这里缺少 Publisher.cpp、 Subscriber.cpp、DataReaderListenerImpl.cpp、DataReaderListenerImpl.h四个文件,后面会贴出代码。

project(*idl): dcps {
  requires += no_opendds_safety_profile
  TypeSupport_Files {
    DDSData.idl
  }

  custom_only = 1
}

project(*java): dcps_java {
  idlflags      += -Wb,stub_export_include=DDSData_Export.h \
                   -Wb,stub_export_macro=DDSData_Export
  dcps_ts_flags += -Wb,export_export_include=DDSData_Export.h \
			-Wb,export_export_macro=DDSData_Export
  idl2jniflags  += -Wb,stub_export_include=DDSData_Export.h \
                   -Wb,stub_export_macro=DDSData_Export
  dynamicflags  += DDSDATA_BUILD_DLL

  TypeSupport_Files {
    DDSData.idl
  }
  specific{
    jarname = DDSData
  }
  Source_Files {
  }
}

project(*publisher) : dcpsexe, dcps_tcp, dcps_rtps_udp {
  requires += no_opendds_safety_profile
  exename   = publisher
  after    += *idl

  TypeSupport_Files {
    DDSData.idl
  }
	
  Source_Files {
    Publisher.cpp
  }
}

project(*subscriber) : dcpsexe, dcps_tcp, dcps_rtps_udp {
  requires += no_opendds_safety_profile
  exename   = subscriber
  after    += *publisher

  TypeSupport_Files {
    DDSData.idl
  }

  Source_Files {
    DataReaderListenerImpl.cpp
    Subscriber.cpp
  }
}

Publisher.cpp

#include "DDSData_TS.hpp"
#include "ace/Log_Msg.h"
#include "ace/OS_NS_unistd.h"
 
#ifdef ACE_AS_STATIC_LIBS
# include "dds/DCPS/RTPS/RtpsDiscovery.h"
# include "dds/DCPS/transport/rtps_udp/RtpsUdp.h"
#endif
 
// FUZZ: disable check_for_improper_main_declaration
int main(int, char*[])
{
  // Initialize the TS interface
  FACE::RETURN_CODE_TYPE status;
  FACE::TS::Initialize("face_config.ini", status);
 
  if (status != FACE::RC_NO_ERROR) {
    return static_cast<int>(status);
  }
 
  // Create the pub connection
  FACE::CONNECTION_ID_TYPE connId;
  FACE::CONNECTION_DIRECTION_TYPE dir;
  FACE::MESSAGE_SIZE_TYPE max_msg_size;
  FACE::TS::Create_Connection(
    "pub", FACE::PUB_SUB, connId, dir,
    max_msg_size, FACE::INF_TIME_VALUE, status);
 
  if (status != FACE::RC_NO_ERROR) {
    return static_cast<int>(status);
  }
 
  // Message to send
  Messenger::Message msg;
  msg.text = "Hello, World!";
  msg.subject_id = 14;
  msg.count = 1;
 
  // Send message
  FACE::TRANSACTION_ID_TYPE txn;
  ACE_DEBUG((LM_INFO, "Publisher: about to Send_Message()\n"));
  FACE::TS::Send_Message(
    connId, FACE::INF_TIME_VALUE, txn, msg,
    max_msg_size, status);
  if (status != FACE::RC_NO_ERROR) {
    return static_cast<int>(status);
  }
 
  // Give message time to be processed before exiting
  ACE_OS::sleep(15);
 
  // Destroy the pub connection
  FACE::TS::Destroy_Connection(connId, status);
 
  if (status != FACE::RC_NO_ERROR) {
    return static_cast<int>(status);
  }
 
  return EXIT_SUCCESS;
}
// FUZZ: enable check_for_improper_main_declaration

Subscriber.cpp

#include "DDSData_TS.hpp"
#include "ace/Log_Msg.h"
 
#ifdef ACE_AS_STATIC_LIBS
# include "dds/DCPS/RTPS/RtpsDiscovery.h"
# include "dds/DCPS/transport/rtps_udp/RtpsUdp.h"
#endif
 
// FUZZ: disable check_for_improper_main_declaration
int main(int, char*[])
{
  // Initialize the TS interface
  FACE::RETURN_CODE_TYPE status;
  FACE::TS::Initialize("face_config.ini", status);
 
  if (status != FACE::RC_NO_ERROR) {
    return static_cast<int>(status);
  }
 
  // Create the sub connection
  FACE::CONNECTION_ID_TYPE connId;
  FACE::CONNECTION_DIRECTION_TYPE dir;
  FACE::MESSAGE_SIZE_TYPE max_msg_size;
  FACE::TS::Create_Connection(
    "sub", FACE::PUB_SUB, connId, dir,
    max_msg_size, FACE::INF_TIME_VALUE, status);
 
  if (status != FACE::RC_NO_ERROR) {
    return static_cast<int>(status);
  }
 
  // Receive a message
  const FACE::TIMEOUT_TYPE timeout = FACE::INF_TIME_VALUE;
  FACE::TRANSACTION_ID_TYPE txn;
  Messenger::Message msg;
 
  ACE_DEBUG((
    LM_INFO, "Subscriber: about to Receive_Message()\n"));
 
  FACE::TS::Receive_Message(
    connId, timeout, txn, msg, max_msg_size, status);
 
  if (status != FACE::RC_NO_ERROR) {
    return static_cast<int>(status);
  }
 
  // Output the message
  ACE_DEBUG((LM_INFO, "%C\t%d\n", msg.text.in(), msg.count));
 
  // Destroy the sub connection
  FACE::TS::Destroy_Connection(connId, status);
 
  if (status != FACE::RC_NO_ERROR) {
    return static_cast<int>(status);
  }
 
  return EXIT_SUCCESS;
}
// FUZZ: enable check_for_improper_main_declaration

DataReaderListenerImpl.h

#ifndef DATAREADER_LISTENER_IMPL_H
#define DATAREADER_LISTENER_IMPL_H

#include <ace/Global_Macros.h>

#include <dds/DdsDcpsSubscriptionC.h>
#include <dds/DCPS/LocalObject.h>
#include <dds/DCPS/Definitions.h>

class DataReaderListenerImpl
  : public virtual OpenDDS::DCPS::LocalObject<DDS::DataReaderListener> {
public:
  virtual void on_requested_deadline_missed(
    DDS::DataReader_ptr reader,
    const DDS::RequestedDeadlineMissedStatus& status);

  virtual void on_requested_incompatible_qos(
    DDS::DataReader_ptr reader,
    const DDS::RequestedIncompatibleQosStatus& status);

  virtual void on_sample_rejected(
    DDS::DataReader_ptr reader,
    const DDS::SampleRejectedStatus& status);

  virtual void on_liveliness_changed(
    DDS::DataReader_ptr reader,
    const DDS::LivelinessChangedStatus& status);

  virtual void on_data_available(
    DDS::DataReader_ptr reader);

  virtual void on_subscription_matched(
    DDS::DataReader_ptr reader,
    const DDS::SubscriptionMatchedStatus& status);

  virtual void on_sample_lost(
    DDS::DataReader_ptr reader,
    const DDS::SampleLostStatus& status);
};

#endif /* DATAREADER_LISTENER_IMPL_H */

DataReaderListenerImpl.cpp

#include <ace/Log_Msg.h>
#include <ace/OS_NS_stdlib.h>

#include "DataReaderListenerImpl.h"
#include "MessengerTypeSupportC.h"
#include "MessengerTypeSupportImpl.h"

#include <iostream>

void
DataReaderListenerImpl::on_requested_deadline_missed(
  DDS::DataReader_ptr /*reader*/,
  const DDS::RequestedDeadlineMissedStatus& /*status*/)
{
}

void
DataReaderListenerImpl::on_requested_incompatible_qos(
  DDS::DataReader_ptr /*reader*/,
  const DDS::RequestedIncompatibleQosStatus& /*status*/)
{
}

void
DataReaderListenerImpl::on_sample_rejected(
  DDS::DataReader_ptr /*reader*/,
  const DDS::SampleRejectedStatus& /*status*/)
{
}

void
DataReaderListenerImpl::on_liveliness_changed(
  DDS::DataReader_ptr /*reader*/,
  const DDS::LivelinessChangedStatus& /*status*/)
{
}

void
DataReaderListenerImpl::on_data_available(DDS::DataReader_ptr reader)
{
  Messenger::MessageDataReader_var reader_i =
    Messenger::MessageDataReader::_narrow(reader);

  if (!reader_i) {
    ACE_ERROR((LM_ERROR,
               ACE_TEXT("ERROR: %N:%l: on_data_available() -")
               ACE_TEXT(" _narrow failed!\n")));
    ACE_OS::exit(1);
  }

  Messenger::Message message;
  DDS::SampleInfo info;

  const DDS::ReturnCode_t error = reader_i->take_next_sample(message, info);

  if (error == DDS::RETCODE_OK) {
    std::cout << "SampleInfo.sample_rank = " << info.sample_rank << std::endl;
    std::cout << "SampleInfo.instance_state = " << OpenDDS::DCPS::InstanceState::instance_state_mask_string(info.instance_state) << std::endl;

if (info.valid_data) {
    // 数据的输出  取决于你的IDL定义 
      std::cout << "Message: subject    = " << message.subject.in() << std::endl
                << "         subject_id = " << message.subject_id   << std::endl
                << "         from       = " << message.from.in()    << std::endl
                << "         count      = " << message.count        << std::endl
                << "         text       = " << message.text.in()    << std::endl;

    }

  } else {
    ACE_ERROR((LM_ERROR,
               ACE_TEXT("ERROR: %N:%l: on_data_available() -")
               ACE_TEXT(" take_next_sample failed!\n")));
  }
}

void
DataReaderListenerImpl::on_subscription_matched(
  DDS::DataReader_ptr /*reader*/,
  const DDS::SubscriptionMatchedStatus& /*status*/)
{
}

void
DataReaderListenerImpl::on_sample_lost(
  DDS::DataReader_ptr /*reader*/,
  const DDS::SampleLostStatus& /*status*/)
{
}

1.3、生成解决方案

通过mwc生成vs解决方案

perl D:\OpenDDS\ACE_wrappers\bin\mwc.pl -type vs2019  -features java=1

生成后如下图所示:(文件名称忽略掉,不是一个工程)
在这里插入图片描述
打开*.sln,编译Messenger_Idl工程,无报错说明C++端的已生成成功。编译Messenger_Java工程,无报错说明java端的已生成成功。

2、通讯测试

2.1、使用repo server 通讯

运行程序DCPSInfoRepo,subscriber,publisher;
先在A机器上运行DCPSInfoRepo和subscriber;然后在B机器上运行publisher。
(1)A机器上先运行&DDS_ROOT/bin/DCPSInfoRepo-ORBDebugLevel 10 -ORBLogFile DCPSInfoRepo.log -o repo.ior;
(2)拷贝A机器上repo.ior文件到B机器上;
(3)A机器上运行./subscriber -ORBDebugLevel 10 -DCPSDebugLevel 10 – DCPSTransportDebugLevel 6 -ORBLogFile subscriber.log
(4)B机器上运行./publisher -ORBDebugLevel 10 -DCPSDebugLevel 10 – ORBLogFile publisher.log

注意事项:
(1) DCPSInfoRepo生成的repo.ior文件必须存放在subscriber和publisher工程文件目录下;
(2) 必须先运行DCPSInfoRepo,然后拷贝repo.ior文件到其他机器上;

rtps.ini文件内容(DCPSInfoRepo后面的路径为你的实际路径)

[common]
DCPSInfoRepo=file://D:\OpenDDS\DevGuideExamples\DCPS\Messenger\repo.ior
 
[transport/1]
transport_type=tcp

2.2、使用repo ip+port方式

运行程序DCPSInfoRepo,subscriber,publisher;
先在A机器上运行DCPSInfoRepo和subscriber;然后在B机器上运行publisher。

DCPSInfoRepo:
DCPSInfoRepo -ORBListenEndpoints iiop://192.168.175.135:12345 -ORBDebugLevel 10 -ORBLogFile DCPSInfoRepo.log

subscriber订阅:
.\subscriber.exe -DCPSConfigFile rtps.ini

publisher发布:
.\publisher.exe -DCPSConfigFile rtps.ini

rtps.ini文件内容

[common]
DCPSDebugLevel=0
DCPSInfoRepo=corbaloc::192.168.0.11:12345/DCPSInfoRepo
DCPSChunks=20
DCPSChunkAssociationMutltiplier=10
DCPSLivelinessFactor=80
DCPSBit=0
DCPSGlobalTransportConfig=$file

[domain/42]
DiscoveryConfig=uni_rtps

[rtps_discovery/uni_rtps]
SedpMulticast=0
ResendPeriod=2

[transport/the_rtps_transport]
transport_type=rtps_udp
use_multicast=0

[topic/Message]
platform_view_guid=103
type_name=DDSData::MessageOne
max_message_size=300

2.3、对等发现face

运行程序subscriber,publisher;
先在A机器上运行subscriber;然后在B机器上运行publisher。

rtps.ini文件内容

[common]
DCPSGlobalTransportConfig=$file
DCPSDefaultDiscovery=DEFAULT_RTPS

[domain/3]
DiscoveryConfig=uni_rtps

[rtps_discovery/uni_rtps]
SedpMulticast=0
ResendPeriod=2

[transport/the_rtps_transport]
transport_type=rtps_udp
use_multicast=0

[topic/Message]
platform_view_guid=103
type_name=DDSData::MessageOne
max_message_size=300


关于Java端的代码具体可参考OpenDDS\java\tests\messenger下的demo。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐