Skip to content

Commit

Permalink
[bugfix] eCAL::core: Avoid possible deadlocks inside callbacks (#1130)
Browse files Browse the repository at this point in the history
* Transform Subscriber / Subgate datareader to shared_ptr instead of raw pointer.
* Transform CPublisher / CPubGate CDataWriter to shared_ptr instead of raw pointer.
* Successfully allow to call `Destroy()` function in callbacks. (Lock map only to retreive datareaders).
  • Loading branch information
KerstinKeller committed Jun 29, 2023
1 parent a2fb023 commit 7794a16
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 111 deletions.
99 changes: 50 additions & 49 deletions ecal/core/include/ecal/ecal_publisher.h

Large diffs are not rendered by default.

69 changes: 35 additions & 34 deletions ecal/core/include/ecal/ecal_subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <ecal/ecal_qos.h>
#include <ecal/types/topic_information.h>

#include <memory>
#include <set>
#include <string>

Expand Down Expand Up @@ -79,13 +80,13 @@ namespace eCAL
* @endcode
**/

class ECAL_API CSubscriber
class CSubscriber
{
public:
/**
* @brief Constructor.
**/
CSubscriber();
ECAL_API CSubscriber();

/**
* @brief Constructor.
Expand All @@ -95,47 +96,47 @@ namespace eCAL
* @param topic_desc_ Type description (optional for description checking).
**/
[[deprecated("Please use the constructor CSubscriber(const std::string& topic_name_, const STopicInformation& topic_info_) instead. This function will be removed in eCAL6.")]]
CSubscriber(const std::string& topic_name_, const std::string& topic_type_, const std::string& topic_desc_ = "");
ECAL_API CSubscriber(const std::string& topic_name_, const std::string& topic_type_, const std::string& topic_desc_ = "");

/**
* @brief Constructor.
*
* @param topic_name_ Unique topic name.
* @param topic_info_ Topic information (encoding, type, descriptor)
**/
CSubscriber(const std::string& topic_name_, const STopicInformation& topic_info_);
ECAL_API CSubscriber(const std::string& topic_name_, const STopicInformation& topic_info_);

/**
* @brief Constructor.
*
* @param topic_name_ Unique topic name.
**/
CSubscriber(const std::string& topic_name_);
ECAL_API CSubscriber(const std::string& topic_name_);

/**
* @brief Destructor.
**/
virtual ~CSubscriber();
ECAL_API virtual ~CSubscriber();

/**
* @brief CSubscribers are non-copyable
**/
CSubscriber(const CSubscriber&) = delete;
ECAL_API CSubscriber(const CSubscriber&) = delete;

/**
* @brief CSubscribers are non-copyable
**/
CSubscriber& operator=(const CSubscriber&) = delete;
ECAL_API CSubscriber& operator=(const CSubscriber&) = delete;

/**
* @brief CSubscribers are move-enabled
**/
CSubscriber(CSubscriber&& rhs) noexcept;
ECAL_API CSubscriber(CSubscriber&& rhs) noexcept;

/**
* @brief CSubscribers are move-enabled
**/
CSubscriber& operator=(CSubscriber&& rhs) noexcept;
ECAL_API CSubscriber& operator=(CSubscriber&& rhs) noexcept;

/**
* @brief Creates this object.
Expand All @@ -147,7 +148,7 @@ namespace eCAL
* @return true if it succeeds, false if it fails.
**/
[[deprecated("Please use the create method bool Create(const std::string& topic_name_, const STopicInformation& topic_info_) instead. This function will be removed in eCAL6.")]]
bool Create(const std::string& topic_name_, const std::string& topic_type_, const std::string& topic_desc_ = "");
ECAL_API bool Create(const std::string& topic_name_, const std::string& topic_type_, const std::string& topic_desc_ = "");

/**
* @brief Creates this object.
Expand All @@ -156,7 +157,7 @@ namespace eCAL
*
* @return True if it succeeds, false if it fails.
**/
bool Create(const std::string& topic_name_) {
ECAL_API bool Create(const std::string& topic_name_) {
return Create(topic_name_, STopicInformation{});
}

Expand All @@ -168,14 +169,14 @@ namespace eCAL
*
* @return True if it succeeds, false if it fails.
**/
bool Create(const std::string& topic_name_, const STopicInformation& topic_info_);
ECAL_API bool Create(const std::string& topic_name_, const STopicInformation& topic_info_);

/**
* @brief Destroys this object.
*
* @return true if it succeeds, false if it fails.
**/
bool Destroy();
ECAL_API bool Destroy();

/**
* @brief Set subscriber quality of service attributes.
Expand All @@ -184,14 +185,14 @@ namespace eCAL
*
* @return True if it succeeds, false if it fails.
**/
bool SetQOS(const QOS::SReaderQOS& qos_);
ECAL_API bool SetQOS(const QOS::SReaderQOS& qos_);

/**
* @brief Get current subscriber quality of service attributes.
*
* @return Quality of service attributes.
**/
QOS::SReaderQOS GetQOS();
ECAL_API QOS::SReaderQOS GetQOS();

/**
* @brief Set a set of id's to prefiltering topics (see CPublisher::SetID).
Expand All @@ -200,7 +201,7 @@ namespace eCAL
*
* @return True if it succeeds, false if it fails.
**/
bool SetID(const std::set<long long>& id_set_);
ECAL_API bool SetID(const std::set<long long>& id_set_);

/**
* @brief Sets subscriber attribute.
Expand All @@ -211,7 +212,7 @@ namespace eCAL
* @return True if it succeeds, false if it fails.
* @experimental
**/
bool SetAttribute(const std::string& attr_name_, const std::string& attr_value_);
ECAL_API bool SetAttribute(const std::string& attr_name_, const std::string& attr_value_);

/**
* @brief Removes subscriber attribute.
Expand All @@ -221,7 +222,7 @@ namespace eCAL
* @return True if it succeeds, false if it fails.
* @experimental
**/
bool ClearAttribute(const std::string& attr_name_);
ECAL_API bool ClearAttribute(const std::string& attr_name_);

/**
* @brief Receive a message from the publisher.
Expand All @@ -233,7 +234,7 @@ namespace eCAL
* @return Length of received buffer.
**/
[[deprecated]]
size_t Receive(std::string& buf_, long long* time_ = nullptr, int rcv_timeout_ = 0) const;
ECAL_API size_t Receive(std::string& buf_, long long* time_ = nullptr, int rcv_timeout_ = 0) const;

/**
* @brief Receive a message from the publisher (able to process zero length buffer).
Expand All @@ -244,7 +245,7 @@ namespace eCAL
*
* @return True if it succeeds, false if it fails.
**/
bool ReceiveBuffer(std::string& buf_, long long* time_ = nullptr, int rcv_timeout_ = 0) const;
ECAL_API bool ReceiveBuffer(std::string& buf_, long long* time_ = nullptr, int rcv_timeout_ = 0) const;

/**
* @brief Add callback function for incoming receives.
Expand All @@ -253,14 +254,14 @@ namespace eCAL
*
* @return True if succeeded, false if not.
**/
bool AddReceiveCallback(ReceiveCallbackT callback_);
ECAL_API bool AddReceiveCallback(ReceiveCallbackT callback_);

/**
* @brief Remove callback function for incoming receives.
*
* @return True if succeeded, false if not.
**/
bool RemReceiveCallback();
ECAL_API bool RemReceiveCallback();

/**
* @brief Add callback function for subscriber events.
Expand All @@ -270,7 +271,7 @@ namespace eCAL
*
* @return True if succeeded, false if not.
**/
bool AddEventCallback(eCAL_Subscriber_Event type_, SubEventCallbackT callback_);
ECAL_API bool AddEventCallback(eCAL_Subscriber_Event type_, SubEventCallbackT callback_);

/**
* @brief Remove callback function for subscriber events.
Expand All @@ -279,51 +280,51 @@ namespace eCAL
*
* @return True if succeeded, false if not.
**/
bool RemEventCallback(eCAL_Subscriber_Event type_);
ECAL_API bool RemEventCallback(eCAL_Subscriber_Event type_);

/**
* @brief Query if this object is created.
*
* @return true if created, false if not.
**/
bool IsCreated() const {return(m_created);}
ECAL_API bool IsCreated() const {return(m_created);}

/**
* @brief Query the number of publishers.
*
* @return Number of publishers.
**/
size_t GetPublisherCount() const;
ECAL_API size_t GetPublisherCount() const;

/**
* @brief Gets name of the connected topic.
*
* @return The topic name.
**/
std::string GetTopicName() const;
ECAL_API std::string GetTopicName() const;

/**
* @brief Gets type of the connected topic.
*
* @return The type name.
**/
[[deprecated("Please use the method STopicInformation GetTopicInformation() instead. You can extract the typename from the STopicInformation variable. This function will be removed in eCAL6.")]]
std::string GetTypeName() const;
ECAL_API std::string GetTypeName() const;

/**
* @brief Gets description of the connected topic.
*
* @return The description.
**/
[[deprecated("Please use the method STopicInformation GetTopicInformation() instead. You can extract the descriptor from the STopicInformation variable. This function will be removed in eCAL6.")]]
std::string GetDescription() const;
ECAL_API std::string GetDescription() const;

/**
* @brief Gets description of the connected topic.
*
* @return The topic information.
**/
STopicInformation GetTopicInformation() const;
ECAL_API STopicInformation GetTopicInformation() const;

/**
* @brief Set the timeout parameter for triggering
Expand All @@ -333,7 +334,7 @@ namespace eCAL
*
* @return True if succeeded, false if not.
**/
bool SetTimeout(int timeout_);
ECAL_API bool SetTimeout(int timeout_);

/**
* @brief Dump the whole class state into a string.
Expand All @@ -342,14 +343,14 @@ namespace eCAL
*
* @return The dump sting.
**/
std::string Dump(const std::string& indent_ = "") const;
ECAL_API std::string Dump(const std::string& indent_ = "") const;

protected:
void InitializeQOS();
bool ApplyTopicToDescGate(const std::string& topic_name_, const STopicInformation& topic_info_);

// class members
CDataReader* m_datareader;
std::shared_ptr<CDataReader> m_datareader;
struct ECAL_API QOS::SReaderQOS m_qos;
bool m_created;
bool m_initialized;
Expand Down
6 changes: 3 additions & 3 deletions ecal/core/src/pubsub/ecal_pubgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,18 @@ namespace eCAL
m_share_desc = state_;
}

bool CPubGate::Register(const std::string& topic_name_, CDataWriter* datawriter_)
bool CPubGate::Register(const std::string& topic_name_, const std::shared_ptr<CDataWriter>& datawriter_)
{
if(!m_created) return(false);

// register writer and multicast group
const std::unique_lock<std::shared_timed_mutex> lock(m_topic_name_datawriter_sync);
m_topic_name_datawriter_map.emplace(std::pair<std::string, CDataWriter*>(topic_name_, datawriter_));
m_topic_name_datawriter_map.emplace(std::pair<std::string, std::shared_ptr<CDataWriter>>(topic_name_, datawriter_));

return(true);
}

bool CPubGate::Unregister(const std::string& topic_name_, CDataWriter* datawriter_)
bool CPubGate::Unregister(const std::string& topic_name_, const std::shared_ptr<CDataWriter>& datawriter_)
{
if(!m_created) return(false);
bool ret_state = false;
Expand Down
6 changes: 3 additions & 3 deletions ecal/core/src/pubsub/ecal_pubgate.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ namespace eCAL
void ShareDescription(bool state_);
bool DescriptionShared() const { return m_share_desc; };

bool Register(const std::string& topic_name_, CDataWriter* datawriter_);
bool Unregister(const std::string& topic_name_, CDataWriter* datawriter_);
bool Register(const std::string& topic_name_, const std::shared_ptr<CDataWriter>& datawriter_);
bool Unregister(const std::string& topic_name_, const std::shared_ptr<CDataWriter>& datawriter_);

void ApplyLocSubRegistration(const eCAL::pb::Sample& ecal_sample_);
void ApplyLocSubUnregistration(const eCAL::pb::Sample& ecal_sample_);
Expand All @@ -67,7 +67,7 @@ namespace eCAL
bool m_share_type;
bool m_share_desc;

using TopicNameDataWriterMapT = std::multimap<std::string, CDataWriter *>;
using TopicNameDataWriterMapT = std::multimap<std::string, std::shared_ptr<CDataWriter>>;
std::shared_timed_mutex m_topic_name_datawriter_sync;
TopicNameDataWriterMapT m_topic_name_datawriter_map;
};
Expand Down
5 changes: 2 additions & 3 deletions ecal/core/src/pubsub/ecal_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ namespace eCAL
if (m_tlayer.sm_inproc == TLayer::smode_none) m_tlayer.sm_inproc = Config::GetPublisherInprocMode();

// create data writer
m_datawriter = new CDataWriter();
m_datawriter = std::make_shared<CDataWriter>();
// set qos
m_datawriter->SetQOS(m_qos);
// set transport layer
Expand Down Expand Up @@ -188,8 +188,7 @@ namespace eCAL
#endif

// free datawriter
delete m_datawriter;
m_datawriter = nullptr;
m_datawriter.reset();

// we made it :-)
m_created = false;
Expand Down
Loading

0 comments on commit 7794a16

Please sign in to comment.