Skip to content

Commit

Permalink
Fixed most tests
Browse files Browse the repository at this point in the history
  • Loading branch information
FlorianReimold committed Jul 3, 2023
1 parent 080f1d9 commit 96e527c
Show file tree
Hide file tree
Showing 11 changed files with 258 additions and 160 deletions.
5 changes: 3 additions & 2 deletions ecal/core/include/ecal/ecal_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include <string>
#include <vector>
#include <memory>

namespace eCAL
{
Expand Down Expand Up @@ -151,8 +152,8 @@ namespace eCAL
**/
bool IsConnected();

protected:
CServiceServerImpl* m_service_server_impl;
private:
std::shared_ptr<CServiceServerImpl> m_service_server_impl;
bool m_created;
};
}
3 changes: 3 additions & 0 deletions ecal/core/src/ecal_globals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ namespace eCAL

if (components_ & Init::Service)
{
// Reset the service manager, so it will be able to create new services, again
eCAL::service::ServiceManager::instance()->reset();

/////////////////////
// SERVICE GATE
/////////////////////
Expand Down
222 changes: 133 additions & 89 deletions ecal/core/src/service/ecal_service_client_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,21 +214,20 @@ namespace eCAL
}
}
return false;

// TODO: Call timeout callbacks here??????
}

// blocking call, all responses will be returned in service_response_vec_
bool CServiceClientImpl::Call(const std::string& method_name_, const std::string& request_, int timeout_, ServiceResponseVecT* service_response_vec_)
std::shared_ptr<std::vector<std::pair<bool, eCAL::SServiceResponse>>>
CServiceClientImpl::CallBlocking(const std::string& method_name_
, const std::string& request_
, std::chrono::nanoseconds timeout_)
{
if (g_clientgate() == nullptr) return false;
if (!m_created) return false;

if (m_service_name.empty()
|| method_name_.empty()
)
return false;
if (g_clientgate() == nullptr) return nullptr;
if (!m_created) return nullptr;

// reset response
if(service_response_vec_ != nullptr) service_response_vec_->clear();
if (m_service_name.empty() || method_name_.empty())
return nullptr;

// check for new server
CheckForNewServices();
Expand All @@ -249,7 +248,8 @@ namespace eCAL
// sure the callbacks can still operate on those variables.
const auto mutex = std::make_shared<std::mutex>();
const auto condition_variable = std::make_shared<std::condition_variable>();
const auto responses = std::make_shared<ServiceResponseVecT>();
const auto responses = std::make_shared<std::vector<std::pair<bool, SServiceResponse>>>(); // Vector with [has_returned, response] pairs, so we know where a timeout has happened.
const auto block_modifying_responses = std::make_shared<bool>(false);
const auto finished_service_call_count = std::make_shared<int>(0);

const auto expected_service_call_count = std::make_shared<int>(0);
Expand All @@ -276,34 +276,40 @@ namespace eCAL
const std::lock_guard<std::mutex> lock(*mutex);
(*expected_service_call_count)++;
responses->emplace_back();
responses->back().host_name = service.hname;
responses->back().service_name = service.sname;
responses->back().service_id = service.key;
responses->back().method_name = method_name_;
responses->back().error_msg = "Timeout";
responses->back().ret_state = 0;
responses->back().call_state = eCallState::call_state_failed;
responses->back().response = "";
// TODO: Looks like I need to also call some event callback here, for every call that timeouted
responses->back().first = false; // If this stays false, we have a timout
responses->back().second.host_name = service.hname;
responses->back().second.service_name = service.sname;
responses->back().second.service_id = service.key;
responses->back().second.method_name = method_name_;
responses->back().second.error_msg = "Timeout";
responses->back().second.ret_state = 0;
responses->back().second.call_state = eCallState::call_state_failed;
responses->back().second.response = "";

// Create a response callback, that will set the response and notify the condition variable
response_callback
= [mutex, condition_variable, responses, finished_service_call_count, i = (responses->size() - 1)]
= [mutex, condition_variable, responses, block_modifying_responses, finished_service_call_count, i = (responses->size() - 1)]
(const eCAL::service::Error& response_error, const std::shared_ptr<std::string>& response_)
{
const std::lock_guard<std::mutex> lock(*mutex);

if (response_error)
{
(*responses)[i].error_msg = response_error.ToString();
(*responses)[i].call_state = eCallState::call_state_failed;
(*responses)[i].ret_state = 0;
}
else
if (!(*block_modifying_responses))
{
fromSerializedProtobuf(*response_, (*responses)[i]);
}
// This calback has not timeouted. This does not tell us anything about the success, though.
(*responses)[i].first = true;

if (response_error)
{
(*responses)[i].second.error_msg = response_error.ToString();
(*responses)[i].second.call_state = eCallState::call_state_failed;
(*responses)[i].second.ret_state = 0;
}
else
{
fromSerializedProtobuf(*response_, (*responses)[i].second);
}
}

(*finished_service_call_count)++;
condition_variable->notify_all();
};
Expand All @@ -320,15 +326,16 @@ namespace eCAL
// Lock mutex, call service asynchronously and wait for the condition variable to be notified
{
std::unique_lock<std::mutex> lock(*mutex);
if (timeout_ > 0)
if (timeout_ > std::chrono::nanoseconds::zero())
{
condition_variable->wait_for(lock
, std::chrono::milliseconds(timeout_)
, timeout_
, [&expected_service_call_count, &finished_service_call_count]()
{
// Wait for all services to return something
return *expected_service_call_count == *finished_service_call_count;
});

}
else
{
Expand All @@ -338,85 +345,122 @@ namespace eCAL
return *expected_service_call_count == *finished_service_call_count;
});
}
// Stop the callbacks from modifying the responses vector. This is important
//in a timeout case, as we want to preserve the vector from when the timeout
// occured. There is no way to stop the service calls from succeeding after
// that, but we don't want their values, any more.
*block_modifying_responses = true;

return responses;
}
}

// blocking call, all responses will be returned in service_response_vec_
bool CServiceClientImpl::Call(const std::string& method_name_, const std::string& request_, int timeout_, ServiceResponseVecT* service_response_vec_)
{
auto responses = CallBlocking(method_name_, request_, std::chrono::milliseconds(timeout_));

if (service_response_vec_)
service_response_vec_->clear();

if (!responses)
{
return false;
}
else
{
// Copy our temporary return vector to the user's return vector
service_response_vec_->resize(responses->size());
for (int i = 0; i < responses->size(); i++)
{
(*service_response_vec_)[i] = (*responses)[i].second;
}

// Now just copy our temporary return vector to the user's return vector
if (service_response_vec_ != nullptr)
// Call the timeout callback for all services that have not returned yet
if (timeout_ > 0)
{
service_response_vec_->clear();
service_response_vec_->resize(responses->size());
for (int i = 0; i < responses->size(); i++)
std::lock_guard<std::mutex> const lock_eb(m_event_callback_map_sync);
auto callback_it = m_event_callback_map.find(eCAL_Client_Event::client_event_timeout);
if (callback_it != m_event_callback_map.end())
{
(*service_response_vec_)[i] = (*responses)[i];
for (const auto& return_pair : (*responses))
{
if (!return_pair.first)
{
SClientEventCallbackData sdata;
sdata.type = client_event_timeout;
sdata.time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
(callback_it->second)(m_service_name.c_str(), &sdata);
}
}
}
}

// Determine if any call has been successful
for (int i = 0; i < responses->size(); i++)
{
if ((*responses)[i].call_state == call_state_executed)
if ((*responses)[i].second.call_state == call_state_executed)
return true;
}
return false;
}


//=== Old code, TODO: remove! ====

//bool called(false);
//std::vector<SServiceAttr> const service_vec = g_clientgate()->GetServiceAttr(m_service_name);
//for (const auto& iter : service_vec)
//{
// if (m_host_name.empty() || (m_host_name == iter.hname))
// {
// std::lock_guard<std::mutex> const lock(m_client_map_sync);
// auto client = m_client_map.find(iter.key);
// if (client != m_client_map.end())
// {
// struct SServiceResponse service_response;
// if (SendRequest(client->second, method_name_, request_, timeout_, service_response))
// {
// if(service_response_vec_ != nullptr) service_response_vec_->push_back(service_response);
// called = true;
// }
// }
// }
//}
//return called;
}

// blocking call, using callback
bool CServiceClientImpl::Call(const std::string& method_name_, const std::string& request_, int timeout_)
{
// Create response vector
ServiceResponseVecT response_vec;

// Call all services blocking
bool success = Call(method_name_, request_, timeout_, &response_vec);
auto responses = CallBlocking(method_name_, request_, std::chrono::milliseconds(timeout_));

// iterate over responses and call the callbacks
for (const auto& response : response_vec)
if (!responses)
{
std::lock_guard<std::mutex> const lock_cb(m_response_callback_sync);
if (m_response_callback) m_response_callback(response);
return false;
}
else
{
// iterate over responses and call the response callbacks
{
std::lock_guard<std::mutex> const lock_cb(m_response_callback_sync);
if (m_response_callback)
{
for (const auto& response_pair : *responses)
{
if (response_pair.first)
{
// Call callback, if the call didn't timeout
m_response_callback(response_pair.second);
}
}
}
}

return success;

// ========== Old Code, TODO: Remove ==========

//if (g_clientgate() == nullptr) return false;
//if (!m_created) return false;

//if (m_service_name.empty()
// || method_name_.empty()
// )
// return false;

//// check for new server
//CheckForNewServices();
// iterate over responses and call the timeout callbacks
if (timeout_ > 0)
{
std::lock_guard<std::mutex> const lock_eb(m_event_callback_map_sync);
auto callback_it = m_event_callback_map.find(eCAL_Client_Event::client_event_timeout);
if (callback_it != m_event_callback_map.end())
{
for (const auto& return_pair : (*responses))
{
if (!return_pair.first)
{
SClientEventCallbackData sdata;
sdata.type = client_event_timeout;
sdata.time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
(callback_it->second)(m_service_name.c_str(), &sdata);
}
}
}
}

//// send request to every single service
//return SendRequests(m_host_name, method_name_, request_, timeout_);
// Determine if any call has been successful and return true, if so.
for (int i = 0; i < responses->size(); i++)
{
if ((*responses)[i].second.call_state == call_state_executed)
return true;
}
return false;
}
}

// asynchronously call, using callback
Expand Down
4 changes: 4 additions & 0 deletions ecal/core/src/service/ecal_service_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ namespace eCAL
[[deprecated]]
bool Call(const std::string& method_name_, const std::string& request_, struct SServiceResponse& service_response_);

private:
std::shared_ptr<std::vector<std::pair<bool, eCAL::SServiceResponse>>> CallBlocking(const std::string& method_name_, const std::string& request_, std::chrono::nanoseconds timeout_);

public:
// blocking call, all responses will be returned in service_response_vec_
bool Call(const std::string& method_name_, const std::string& request_, int timeout_, ServiceResponseVecT* service_response_vec_);

Expand Down
7 changes: 3 additions & 4 deletions ecal/core/src/service/ecal_service_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ namespace eCAL
{
if(m_created) return(false);

m_service_server_impl = new CServiceServerImpl(service_name_);
m_service_server_impl = CServiceServerImpl::CreateInstance(service_name_);

// register this service
if (g_servicegate() != nullptr) g_servicegate()->Register(m_service_server_impl);
if (g_servicegate() != nullptr) g_servicegate()->Register(m_service_server_impl.get());

m_created = true;
return(true);
Expand All @@ -93,10 +93,9 @@ namespace eCAL
m_created = false;

// unregister this service
if (g_servicegate() != nullptr) g_servicegate()->Unregister(m_service_server_impl);
if (g_servicegate() != nullptr) g_servicegate()->Unregister(m_service_server_impl.get());

m_service_server_impl->Destroy();
delete m_service_server_impl;
m_service_server_impl = nullptr;

return(true);
Expand Down
Loading

0 comments on commit 96e527c

Please sign in to comment.