Skip to content

Commit

Permalink
Introducing the attribute "host group name" that allows interprocess …
Browse files Browse the repository at this point in the history
…communication across host borders
  • Loading branch information
hannemn committed Jun 29, 2023
1 parent 753bc93 commit f2cee27
Show file tree
Hide file tree
Showing 16 changed files with 80 additions and 34 deletions.
6 changes: 5 additions & 1 deletion ecal/core/cfg/ecal.ini
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,11 @@ filter_excl = ^eCALSysClient$|^eCALSysGUI$|^eCALSys$
; shm_monitoring_enabled = false Enable distribution of monitoring/registration information via shared memory
; shm_monitoring_domain = ecal_monitoring Domain name for shared memory based monitoring/registration
; shm_monitoring_queue_size = 1024 Queue size of monitoring/registration events
; network_monitoring_disabled = false Disable distribution of monitoring/registration information via network (default)
; network_monitoring_disabled = false Disable distribution of monitoring/registration information via network
;
; drop_out_of_order_messages = false Enable dropping of payload messages that arrive out of order
;
; host_group_name = "" Common host group name that enables interprocess mechanisms across (virtual) host borders (e.g, Docker); by default equivalent to local host name
; --------------------------------------------------
[experimental]
shm_monitoring_enabled = false
Expand All @@ -181,3 +183,5 @@ shm_monitoring_queue_size = 1024
network_monitoring_disabled = false

drop_out_of_order_messages = false

host_group_name = ""
1 change: 1 addition & 0 deletions ecal/core/include/ecal/ecal_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ namespace eCAL
ECAL_API size_t GetShmMonitoringQueueSize ();
ECAL_API std::string GetShmMonitoringDomain ();
ECAL_API bool GetDropOutOfOrderMessages ();
ECAL_API std::string GetHostGroupName ();
}
}
}
1 change: 1 addition & 0 deletions ecal/core/include/ecal/ecal_monitoring_struct.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ namespace eCAL
int rclock; //!< registration clock (heart beat)
int hid; //!< host id
std::string hname; //!< host name
std::string hgname; //!< host group name
int pid; //!< process id
std::string pname; //!< process name
std::string uname; //!< unit name
Expand Down
7 changes: 7 additions & 0 deletions ecal/core/include/ecal/ecal_process.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ namespace eCAL
**/
ECAL_API std::string GetHostName();

/**
* @brief Get current host group name.
*
* @return Host group name or empty string if failed.
**/
ECAL_API std::string GetHostGroupName();

/**
* @brief Get unique host id.
*
Expand Down
1 change: 1 addition & 0 deletions ecal/core/src/ecal_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ namespace eCAL
ECAL_API size_t GetShmMonitoringQueueSize () { return static_cast<size_t>(eCALPAR(EXP, SHM_MONITORING_QUEUE_SIZE)); }
ECAL_API std::string GetShmMonitoringDomain () { return eCALPAR(EXP, SHM_MONITORING_DOMAIN);}
ECAL_API bool GetDropOutOfOrderMessages () { return eCALPAR(EXP, DROP_OUT_OF_ORDER_MESSAGES); }
ECAL_API std::string GetHostGroupName () { return eCALPAR(EXP, HOST_GROUP_NAME); }
}
}
}
5 changes: 4 additions & 1 deletion ecal/core/src/ecal_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@
/**********************************************************************************************/
/* enable distribution of monitoring/registration information via shared memory */
#define EXP_SHM_MONITORING_ENABLED false
/* disable distribution of monitoring/registration information via network (default) */
/* disable distribution of monitoring/registration information via network */
#define EXP_NETWORK_MONITORING_DISABLED false
/* queue size of monitoring/registration events */
#define EXP_SHM_MONITORING_QUEUE_SIZE 1024
Expand All @@ -184,3 +184,6 @@

/* enable dropping of payload messages that arrive out of order */
#define EXP_DROP_OUT_OF_ORDER_MESSAGES false

/* common host group name that enables interprocess mechanisms across (virtual) host borders (e.g, Docker); by default equivalent to local host name */
#define EXP_HOST_GROUP_NAME ""
1 change: 1 addition & 0 deletions ecal/core/src/ecal_def_ini.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,4 @@
#define EXP_SHM_MONITORING_QUEUE_SIZE_S "shm_monitoring_queue_size"
#define EXP_SHM_MONITORING_DOMAIN_S "shm_monitoring_domain"
#define EXP_DROP_OUT_OF_ORDER_MESSAGES_S "drop_out_of_order_messages"
#define EXP_HOST_GROUP_NAME_S "host_group_name"
5 changes: 5 additions & 0 deletions ecal/core/src/ecal_process.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,11 @@ namespace eCAL
return(g_host_name);
}

std::string GetHostGroupName()
{
return Config::Experimental::GetHostGroupName().empty() ? GetHostName() : Config::Experimental::GetHostGroupName();
}

int GetHostID()
{
return internal::GetHostID();
Expand Down
23 changes: 14 additions & 9 deletions ecal/core/src/ecal_registration_receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ namespace eCAL
m_callback_process(nullptr),
m_use_network_monitoring(false),
m_use_shm_monitoring(false),
m_callback_custom_apply_sample([](const auto&){})
m_callback_custom_apply_sample([](const auto&){}),
m_host_group_name(Process::GetHostGroupName())

{
}
Expand Down Expand Up @@ -310,8 +311,8 @@ namespace eCAL

void CRegistrationReceiver::ApplySubscriberRegistration(const eCAL::pb::Sample& ecal_sample_)
{
// process local registrations
if (IsLocalHost(ecal_sample_))
// process registrations from same host group
if (IsHostGroupMember(ecal_sample_))
{
// do not register local entities, only if loop back flag is set true
if (m_loopback || (ecal_sample_.topic().pid() != Process::GetProcessID()))
Expand Down Expand Up @@ -357,8 +358,8 @@ namespace eCAL

void CRegistrationReceiver::ApplyPublisherRegistration(const eCAL::pb::Sample& ecal_sample_)
{
// process local registrations
if (IsLocalHost(ecal_sample_))
// process registrations from same host group
if (IsHostGroupMember(ecal_sample_))
{
// do not register local entities, only if loop back flag is set true
if (m_loopback || (ecal_sample_.topic().pid() != Process::GetProcessID()))
Expand Down Expand Up @@ -402,11 +403,15 @@ namespace eCAL
}
}

bool CRegistrationReceiver::IsLocalHost(const eCAL::pb::Sample& ecal_sample_)
bool CRegistrationReceiver::IsHostGroupMember(const eCAL::pb::Sample& ecal_sample_)
{
const std::string host_name = ecal_sample_.topic().hname();
if (host_name.empty()) return false;
if (host_name != eCAL::Process::GetHostName()) return false;
const std::string sample_host_group_name = ecal_sample_.topic().hgname().empty() ? ecal_sample_.topic().hname() : ecal_sample_.topic().hgname();

if (sample_host_group_name.empty() || m_host_group_name.empty())
return false;
if (sample_host_group_name != m_host_group_name)
return false;

return true;
}

Expand Down
38 changes: 20 additions & 18 deletions ecal/core/src/ecal_registration_receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,30 +97,32 @@ namespace eCAL
void ApplySubscriberRegistration(const eCAL::pb::Sample& ecal_sample_);
void ApplyPublisherRegistration(const eCAL::pb::Sample& ecal_sample_);

bool IsLocalHost(const eCAL::pb::Sample & ecal_sample_);

static std::atomic<bool> m_created;
bool m_network;
bool m_loopback;

RegistrationCallbackT m_callback_pub;
RegistrationCallbackT m_callback_sub;
RegistrationCallbackT m_callback_service;
RegistrationCallbackT m_callback_client;
RegistrationCallbackT m_callback_process;

CUDPReceiver m_reg_rcv;
CThread m_reg_rcv_thread;
CUdpRegistrationReceiver m_reg_rcv_process;
bool IsHostGroupMember(const eCAL::pb::Sample & ecal_sample_);

static std::atomic<bool> m_created;
bool m_network;
bool m_loopback;
RegistrationCallbackT m_callback_pub;
RegistrationCallbackT m_callback_sub;
RegistrationCallbackT m_callback_service;
RegistrationCallbackT m_callback_client;
RegistrationCallbackT m_callback_process;
CUDPReceiver m_reg_rcv;
CThread m_reg_rcv_thread;
CUdpRegistrationReceiver m_reg_rcv_process;

eCAL::CMemoryFileBroadcast m_memfile_broadcast;
eCAL::CMemoryFileBroadcastReader m_memfile_broadcast_reader;
CMemfileRegistrationReceiver m_memfile_reg_rcv;
CThread m_memfile_reg_rcv_thread;

bool m_use_network_monitoring;
bool m_use_shm_monitoring;
bool m_use_network_monitoring;
bool m_use_shm_monitoring;

ApplySampleCallbackT m_callback_custom_apply_sample;

ApplySampleCallbackT m_callback_custom_apply_sample;
std::string m_host_group_name;
};
};
15 changes: 10 additions & 5 deletions ecal/core/src/mon/ecal_monitoring_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,12 @@ namespace eCAL
const std::lock_guard<std::mutex> lock(pTopicMap->sync);

// common infos
const int host_id = sample_topic.hid();
const std::string& host_name = sample_topic.hname();
const std::string& process_name = sample_topic.pname();
const std::string& unit_name = sample_topic.uname();
const std::string& topic_id = sample_topic.tid();
const int host_id = sample_topic.hid();
const std::string& host_name = sample_topic.hname();
const std::string& host_group_name = sample_topic.hgname();
const std::string& process_name = sample_topic.pname();
const std::string& unit_name = sample_topic.uname();
const std::string& topic_id = sample_topic.tid();
std::string direction;
switch (pubsub_type_)
{
Expand All @@ -333,6 +334,7 @@ namespace eCAL
// set static content
TopicInfo.hid = host_id;
TopicInfo.hname = host_name;
TopicInfo.hgname = host_group_name;
TopicInfo.pid = process_id;
TopicInfo.pname = process_name;
TopicInfo.uname = unit_name;
Expand Down Expand Up @@ -943,6 +945,9 @@ namespace eCAL
// host name
pMonTopic->set_hname(topic.second.hname);

// host group name
pMonTopic->set_hgname(topic.second.hgname);

// process id
pMonTopic->set_pid(topic.second.pid);

Expand Down
4 changes: 4 additions & 0 deletions ecal/core/src/readwrite/ecal_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ namespace eCAL
////////////////////////////////////////
CDataReader::CDataReader() :
m_host_name(Process::GetHostName()),
m_host_group_name(Process::GetHostGroupName()),
m_host_id(Process::internal::GetHostID()),
m_pid(Process::GetProcessID()),
m_pname(Process::GetProcessName()),
Expand Down Expand Up @@ -218,6 +219,7 @@ namespace eCAL
ecal_reg_sample.set_cmd_type(eCAL::pb::bct_reg_subscriber);
auto *ecal_reg_sample_mutable_topic = ecal_reg_sample.mutable_topic();
ecal_reg_sample_mutable_topic->set_hname(m_host_name);
ecal_reg_sample_mutable_topic->set_hgname(m_host_group_name);
ecal_reg_sample_mutable_topic->set_hid(m_host_id);
ecal_reg_sample_mutable_topic->set_tname(m_topic_name);
ecal_reg_sample_mutable_topic->set_tid(m_topic_id);
Expand Down Expand Up @@ -324,6 +326,7 @@ namespace eCAL
ecal_unreg_sample.set_cmd_type(eCAL::pb::bct_unreg_subscriber);
auto *ecal_reg_sample_mutable_topic = ecal_unreg_sample.mutable_topic();
ecal_reg_sample_mutable_topic->set_hname(m_host_name);
ecal_reg_sample_mutable_topic->set_hgname(m_host_group_name);
ecal_reg_sample_mutable_topic->set_hid(m_host_id);
ecal_reg_sample_mutable_topic->set_pname(m_pname);
ecal_reg_sample_mutable_topic->set_pid(m_pid);
Expand Down Expand Up @@ -980,6 +983,7 @@ namespace eCAL
out << indent_ << " class CDataReader " << std::endl;
out << indent_ << "--------------------------------" << std::endl;
out << indent_ << "m_host_name: " << m_host_name << std::endl;
out << indent_ << "m_host_group_name: " << m_host_group_name << std::endl;
out << indent_ << "m_host_id: " << m_host_id << std::endl;
out << indent_ << "m_topic_name: " << m_topic_name << std::endl;
out << indent_ << "m_topic_id: " << m_topic_id << std::endl;
Expand Down
1 change: 1 addition & 0 deletions ecal/core/src/readwrite/ecal_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ namespace eCAL
bool CheckMessageClock(const std::string& tid_, long long current_clock_);

std::string m_host_name;
std::string m_host_group_name;
int m_host_id;
int m_pid;
std::string m_pname;
Expand Down
4 changes: 4 additions & 0 deletions ecal/core/src/readwrite/ecal_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ namespace eCAL
{
CDataWriter::CDataWriter() :
m_host_name(Process::GetHostName()),
m_host_group_name(Process::GetHostGroupName()),
m_host_id(Process::internal::GetHostID()),
m_pid(Process::GetProcessID()),
m_pname(Process::GetProcessName()),
Expand Down Expand Up @@ -805,6 +806,7 @@ namespace eCAL
out << indent_ << " class CDataWriter " << std::endl;
out << indent_ << "--------------------------------" << std::endl;
out << indent_ << "m_host_name: " << m_host_name << std::endl;
out << indent_ << "m_host_group_name: " << m_host_group_name << std::endl;
out << indent_ << "m_host_id: " << m_host_id << std::endl;
out << indent_ << "m_topic_name: " << m_topic_name << std::endl;
out << indent_ << "m_topic_id: " << m_topic_id << std::endl;
Expand Down Expand Up @@ -843,6 +845,7 @@ namespace eCAL
ecal_reg_sample.set_cmd_type(eCAL::pb::bct_reg_publisher);
auto *ecal_reg_sample_mutable_topic = ecal_reg_sample.mutable_topic();
ecal_reg_sample_mutable_topic->set_hname(m_host_name);
ecal_reg_sample_mutable_topic->set_hgname(m_host_group_name);
ecal_reg_sample_mutable_topic->set_hid(m_host_id);
ecal_reg_sample_mutable_topic->set_tname(m_topic_name);
ecal_reg_sample_mutable_topic->set_tid(m_topic_id);
Expand Down Expand Up @@ -977,6 +980,7 @@ namespace eCAL
ecal_unreg_sample.set_cmd_type(eCAL::pb::bct_unreg_publisher);
auto* ecal_reg_sample_mutable_topic = ecal_unreg_sample.mutable_topic();
ecal_reg_sample_mutable_topic->set_hname(m_host_name);
ecal_reg_sample_mutable_topic->set_hgname(m_host_group_name);
ecal_reg_sample_mutable_topic->set_hid(m_host_id);
ecal_reg_sample_mutable_topic->set_pname(m_pname);
ecal_reg_sample_mutable_topic->set_pid(m_pid);
Expand Down
1 change: 1 addition & 0 deletions ecal/core/src/readwrite/ecal_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ namespace eCAL
void LogSendMode(TLayer::eSendMode smode_, const std::string & base_msg_);

std::string m_host_name;
std::string m_host_group_name;
int m_host_id;
int m_pid;
std::string m_pname;
Expand Down
1 change: 1 addition & 0 deletions ecal/core_pb/src/ecal/core/pb/topic.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ message Topic // eCAL topic
int32 rclock = 1; // registration clock (heart beat)
int32 hid = 26; // host id
string hname = 2; // host name
string hgname = 28; // host group name
int32 pid = 3; // process id
string pname = 4; // process name
string uname = 5; // unit name
Expand Down

0 comments on commit f2cee27

Please sign in to comment.