16 #ifndef _SAG_CONNECTIVITY_PLUGINS_HPP_ 17 #define _SAG_CONNECTIVITY_PLUGINS_HPP_ 25 #ifndef __STDC_FORMAT_MACROS 26 #define __STDC_FORMAT_MACROS 1 31 namespace softwareag {
32 namespace _DATAT_INTERNAL_CPP_NAMESPACE {
39 void replace(std::string &input,
const std::string &from,
const std::string &to)
42 while((pos = input.find(from, pos)) != std::string::npos)
44 input.replace(pos, from.length(), to);
52 std::string
to_string(
const Message &m,
bool truncate=
true)
54 std::string payload =
to_string(m.getPayload());
55 if (truncate && payload.length() > 200) payload = payload.substr(0, 196) +
" ...";
57 replace(payload,
"\n",
"\\n");
58 replace(payload,
"\r",
"\\r");
59 return "Message<metadata="+
to_string(m.getMetadataMap())+
", payload="+payload+
">";
87 typedef std::unique_ptr<PluginHost> ptr_t;
104 if (SAG_ERROR_OK != sag_enable_reliability(chain, static_cast<int>(direction))) {
105 throw std::runtime_error(
"An error occurred while setting chain reliability");
123 if (SAG_ERROR_OK != sag_enable_queue_flush_messages(chain)) {
124 throw std::runtime_error(
"An error occurred while enabling queue flush messages");
130 bool isShuttingDown =
false;
131 if (SAG_ERROR_OK != sag_is_host_shutting_down(chain, isShuttingDown)) {
132 throw std::runtime_error(
"An error occurred while checking if host is shutting down");
134 return isShuttingDown;
140 PluginHost(
void* chain =
nullptr) :chain(chain) {}
187 : chainId(chainId), pluginName(pluginName), config(config.copy()), connectivityManager(connectivityManager), chain(chain)
192 std::string pluginName;
195 void* connectivityManager;
273 sag_delete_user_status_item(connectivityManager, underlying);
274 underlying.item =
nullptr;
286 std::unique_lock<std::mutex> ul(status_lock);
287 setStatusLocked(value);
300 std::unique_lock<std::mutex> ul(status_lock);
302 setStatusLocked(convert_to_details::integerToString(value));
313 std::unique_lock<std::mutex> ul(status_lock);
326 if (incrementValue == 0)
return;
327 std::unique_lock<std::mutex> ul(status_lock);
328 intValue += incrementValue;
329 setStatusLocked(convert_to_details::integerToString(intValue));
334 const std::string &
key() {
return mkey; }
340 StatusItem(
void* connectivityManager,
const std::string &key,
const std::string &initialValue,
const int64_t intValue)
341 : intValue(intValue),
343 lastValue(initialValue),
344 connectivityManager(connectivityManager),
345 underlying(sag_create_user_status_item(connectivityManager, key.c_str(), initialValue.c_str()))
347 if (!underlying.item)
349 std::ostringstream oss;
350 oss <<
"Failed to create status item '" << key <<
"' (ensure the key is unique and that this plug-in has not been shutdown already)";
351 throw std::runtime_error(oss.str());
355 void setStatusLocked(
const std::string &value)
357 if (value == lastValue)
return;
360 sag_set_user_status_item(underlying, value.c_str());
364 const std::string mkey;
365 std::string lastValue;
366 void* connectivityManager;
367 sag_status_item_t underlying;
368 std::mutex status_lock;
390 return std::unique_ptr<StatusItem>(
new StatusItem(connectivityManager, key, initialValue, 0));
408 std::ostringstream oss;
410 return std::unique_ptr<StatusItem>(
new StatusItem(connectivityManager, key, oss.str(), initialValue));
426 sag_set_user_status_map(connectivityManager, reinterpret_cast<const sag_underlying_map_t&>(statusmap));
428 for (
auto it = statusmap.
cbegin(); it != statusmap.
cend(); it++)
431 if (it->second.empty())
432 mapKeysToCleanup.erase(it->first);
434 mapKeysToCleanup.insert(it->first.copy(),
data_t());
445 if (!mapKeysToCleanup.empty())
446 sag_set_user_status_map(connectivityManager, reinterpret_cast<const sag_underlying_map_t&>(mapKeysToCleanup));
452 explicit StatusReporter(
void *connectivityManager) : connectivityManager(connectivityManager), mapKeysToCleanup()
457 void* connectivityManager;
458 map_t mapKeysToCleanup;
461 StatusReporter() =
delete;
464 StatusReporter(
const StatusReporter& other) =
delete;
465 StatusReporter& operator=(
const StatusReporter&) =
delete;
484 host(
new PluginHost(params.chain)), logger(
"connectivity." + pluginName +
"." + chainId),
543 const std::string &
getName()
const {
return pluginName; }
567 if (statusReporter)
return *statusReporter;
568 throw std::runtime_error(
"Cannot call getStatusReporter when using the legacy constructor");
577 std::unique_ptr<StatusReporter> statusReporter;
590 typedef std::unique_ptr<HostSide>
ptr_t;
610 virtual void sendBatchTowardsHost(
Message *start,
Message *end) = 0;
620 sendBatchTowardsHost(&message, &message+1);
633 template<
typename IT>
635 !std::is_const<ampl::remove_ref_t<decltype(*begin)>>::value &&
636 ampl::is_same<
Message, ampl::remove_const_t<ampl::remove_ref_t<decltype(*begin)>>>::value
639 if(begin == end) sendBatchTowardsHost((
Message*)
nullptr, (
Message*)
nullptr);
640 else sendBatchTowardsHost(&(*begin), (&(*(end-1)))+1);
648 class RemoteHostSide:
public HostSide
651 RemoteHostSide(sag_plugin_t other, sag_send_fn_t fn): other(other), fn(fn) {}
652 virtual ~RemoteHostSide() {}
653 virtual void sendBatchTowardsHost(Message *start, Message *end);
669 typedef std::unique_ptr<TransportSide>
ptr_t;
690 virtual void sendBatchTowardsTransport(
Message *start,
Message *end) = 0;
700 sendBatchTowardsTransport(&message, &message+1);
712 template<
typename IT>
714 !std::is_const<ampl::remove_ref_t<decltype(*begin)>>::value &&
715 ampl::is_same<
Message, ampl::remove_const_t<ampl::remove_ref_t<decltype(*begin)>>>::value
718 if(begin == end) sendBatchTowardsTransport((
Message*)
nullptr, (
Message*)
nullptr);
719 else sendBatchTowardsTransport(&(*begin), (&(*(end-1)))+1);
727 class RemoteTransportSide:
public TransportSide
730 RemoteTransportSide(sag_plugin_t other, sag_send_fn_t fn): other(other), fn(fn) {}
731 virtual ~RemoteTransportSide() {}
732 virtual void sendBatchTowardsTransport(Message *start, Message *end);
781 hostSide = std::move(host);
784 virtual void setNextTowardsTransport(TransportSide::ptr_t &&transport)
786 transportSide = std::move(transport);
851 hostSide = std::move(host);
897 for (
Message *it = start; it != end; ++it) {
899 if (it->getPayload().empty()) {
900 deliverNullPayloadTowardsTransport(*it);
902 deliverMessageTowardsTransport(*it);
905 handleException(*it);
910 virtual void deliverMessageTowardsTransport(
Message &msg) = 0;
939 }
catch (
const std::exception &e) {
940 logger.warn(
"Error while delivering message: %s; %s will be dropped.", e.what(),
to_string(m).c_str());
942 logger.warn(
"Unknown error delivering message: %s",
to_string(m).c_str());
987 for (
Message *it = start; it != end; ++it) {
990 if (it->getPayload().empty()) {
991 rv = transformNullPayloadTowardsHost(*it);
993 rv = transformMessageTowardsHost(*it);
996 rv = handleException(*it,
false);
1000 if (it != curr) it->swap(std::move(*curr));
1004 if (hostSide.get()) hostSide->sendBatchTowardsHost(start, curr);
1015 for (
Message *it = start; it != end; ++it) {
1019 if (it->getPayload().empty()) {
1020 rv = transformNullPayloadTowardsTransport(*it);
1022 rv = transformMessageTowardsTransport(*it);
1025 rv = handleException(*it,
true);
1029 if (it != curr) it->swap(std::move(*curr));
1033 if (transportSide.get()) transportSide->sendBatchTowardsTransport(start, curr);
1040 virtual bool transformMessageTowardsHost(
Message &msg) = 0;
1045 virtual bool transformMessageTowardsTransport(
Message &msg) = 0;
1089 }
catch (
const std::exception &e) {
1090 logger.warn(
"Error while transforming message: %s; %s will be dropped.", e.what(),
to_string(m).c_str());
1092 logger.warn(
"Unknown error transforming message: %s",
to_string(m).c_str());
1100 namespace connectivity {
using namespace _DATAT_INTERNAL_CPP_NAMESPACE; }
1105 #include <sag_internal/exception.hpp> 1106 #include <sag_internal/remote_plugins.hpp> 1107 #include <sag_internal/plugin_macros.hpp> 1118 #define SAG_DECLARE_CONNECTIVITY_TRANSPORT_CLASS(Class) _SAG_DECLARE_CONNECTIVITY_TRANSPORT_CLASS(Class) 1129 #define SAG_DECLARE_CONNECTIVITY_CODEC_CLASS(Class) _SAG_DECLARE_CONNECTIVITY_CODEC_CLASS(Class) 1131 #endif // _SAG_CONNECTIVITY_PLUGINS_HPP_ A container for parameters passed to the constructor of a codec plug-in.
Definition: sag_connectivity_plugins.hpp:213
const std::string pluginName
The name used for this plug-in in the configuration file.
Definition: sag_connectivity_plugins.hpp:548
Base class that simplifies implementation of codec plug-ins that deal only with individual messages n...
Definition: sag_connectivity_plugins.hpp:962
AbstractTransport(const PluginConstructorParameters::TransportConstructorParameters ¶ms)
Constructor.
Definition: sag_connectivity_plugins.hpp:843
Base class that simplifies implementation of transport plug-ins that deal only with individual messag...
Definition: sag_connectivity_plugins.hpp:873
void increment(int64_t incrementValue=1)
Set an integer status value by incrementing the previous integer value that was set by this object.
Definition: sag_connectivity_plugins.hpp:324
HostSide::ptr_t hostSide
The next plug-in in the chain towards host.
Definition: sag_connectivity_plugins.hpp:793
An interface to the next component (plugin or host) towards the host.
Definition: sag_connectivity_plugins.hpp:584
const_iterator cbegin() const
Forward const_iterator begin.
Definition: sag_connectivity_cpp.hpp:313
map_t config
The configuration of this plug-in.
Definition: sag_connectivity_plugins.hpp:552
item_ptr createStatusItem(const std::string &key, int64_t initialValue)
Creates a StatusItem instance that can be used to report status for a given key, using an integral in...
Definition: sag_connectivity_plugins.hpp:406
Direction
The enumeration indicating the direction of message flow - towards the transport or towards the host.
Definition: sag_connectivity_plugins.hpp:68
Class for writing to the system logger.
Definition: sag_plugin_logging.hpp:71
const PluginHost::ptr_t host
Interface to support miscellaneous requests from this plug-in to the host system.
Definition: sag_connectivity_plugins.hpp:558
StatusReporter & getStatusReporter()
Allows reporting status information from this plug-in, such as online or failed status and number of ...
Definition: sag_connectivity_plugins.hpp:566
const_iterator cend() const
Forward const_iterator end.
Definition: sag_connectivity_cpp.hpp:315
Contains the C++ implementation of the underlying datatypes used by connectivity plugins and their ac...
void setStatus(const map_t &statusmap)
Set multiple related string status values at the same time (atomically).
Definition: sag_connectivity_plugins.hpp:424
virtual void sendBatchTowardsTransport(Message *start, Message *end)
Implements batch sending, calling deliverMessageTowardsTransport(Message&) for each message individua...
Definition: sag_connectivity_plugins.hpp:895
Base of the inheritance tree for Connectivity plugins.
Definition: sag_connectivity_plugins.hpp:476
TransportSide::ptr_t transportSide
The next plug-in in the chain towards transport.
Definition: sag_connectivity_plugins.hpp:799
std::unique_ptr< StatusItem > item_ptr
Unique pointer to a StatusItem.
Definition: sag_connectivity_plugins.hpp:372
void setStatus(const std::string &value)
Set a string status value.
Definition: sag_connectivity_plugins.hpp:284
AbstractSimpleTransport(const PluginConstructorParameters::TransportConstructorParameters ¶ms)
Constructor.
Definition: sag_connectivity_plugins.hpp:885
Logger logger
Logging for writing to the host log file.
Definition: sag_connectivity_plugins.hpp:575
virtual bool transformNullPayloadTowardsTransport(Message &msg)
Transform a message with a null payload in a transport-wards direction.
Definition: sag_connectivity_plugins.hpp:1059
virtual void start()
Called when an entire chain has been created and the plugin is allowed to start up (after all plugins...
Definition: sag_connectivity_plugins.hpp:506
auto sendBatchTowardsTransport(const IT &begin, const IT &end) -> typename std::enable_if< !std::is_const< ampl::remove_ref_t< decltype(*begin)>>::value &&l::is_same< Message, ampl::remove_const_t< ampl::remove_ref_t< decltype(*begin)>>>::value, void >::type
Overload for sending messages using an iterator range.
Definition: sag_connectivity_plugins.hpp:713
std::enable_if< get_underlying< T >::value, std::string >::type to_string(const T &t)
Get a string representation of t.
A class allowing a plug-in to report status values to the host.
Definition: sag_connectivity_plugins.hpp:240
std::unique_ptr< TransportSide > ptr_t
Pointers to TransportSides should always be this ptr_t type, which is a std::unique_ptr.
Definition: sag_connectivity_plugins.hpp:669
virtual ~TransportSide()
Ensure virtual destruction.
Definition: sag_connectivity_plugins.hpp:667
Base class for transport plug-ins.
Definition: sag_connectivity_plugins.hpp:830
static const char * STATUS_ONLINE()
Returns a constant that should be used as the status value when a component is online,...
Definition: sag_connectivity_plugins.hpp:247
item_ptr createStatusItem(const std::string &key, const std::string &initialValue)
Creates a StatusItem instance that can be used to report status for a given key.
Definition: sag_connectivity_plugins.hpp:388
const std::string & getName() const
The name used for this plug-in in the configuration file.
Definition: sag_connectivity_plugins.hpp:543
bool isShuttingDown()
Check if host is shutting down.
Definition: sag_connectivity_plugins.hpp:129
virtual void handleException(Message &m)
Handle an exception thrown while delivering a message towards the transport.
Definition: sag_connectivity_plugins.hpp:935
void enableQueueFlushMessages()
Enable notifications about queue flushes.
Definition: sag_connectivity_plugins.hpp:122
static const char * STATUS_FAILED()
Returns a constant that should be used as the status value when a component is not currently operatio...
Definition: sag_connectivity_plugins.hpp:255
virtual ~HostSide()
Ensure virtual destruction.
Definition: sag_connectivity_plugins.hpp:588
AbstractSimpleCodec(const PluginConstructorParameters::CodecConstructorParameters ¶ms)
Constructor.
Definition: sag_connectivity_plugins.hpp:974
virtual void hostReady()
Called some time after start(), when the host is ready to start receiving input (sends will be queued...
Definition: sag_connectivity_plugins.hpp:515
Base class for codec plug-ins.
Definition: sag_connectivity_plugins.hpp:760
void sendBatchTowardsHost(Message &&message)
Overload for sending a batch containing a single message.
Definition: sag_connectivity_plugins.hpp:619
const std::string & getPluginName() const
Get the name used in the configuration file for this plug-in.
Definition: sag_connectivity_plugins.hpp:180
static const char * STATUS_STARTING()
Returns a constant that should be used as the status value when a component is still starting,...
Definition: sag_connectivity_plugins.hpp:251
void setStatus(int64_t value)
Set an integer status value.
Definition: sag_connectivity_plugins.hpp:298
A map class which implements many of the functions on std::map.
Definition: sag_connectivity_cpp.hpp:35
const std::string & key()
Get the unique key specified when this status item was created.
Definition: sag_connectivity_plugins.hpp:334
A class that can be used to efficiently update the value associated with a single status key.
Definition: sag_connectivity_plugins.hpp:266
A container for an payload and associated metadata.
Definition: sag_connectivity_cpp.hpp:26
virtual void shutdown()
Stop processing messages and terminate and join any background threads.
Definition: sag_connectivity_plugins.hpp:541
virtual void sendBatchTowardsHost(Message *start, Message *end)
Implements batch sending, calling transformMessageTowardsHost(Message &) for each message individuall...
Definition: sag_connectivity_plugins.hpp:984
An interface to the next component (plugin or host) towards the transport.
Definition: sag_connectivity_plugins.hpp:663
AbstractCodec(const PluginConstructorParameters::CodecConstructorParameters ¶ms)
Constructor.
Definition: sag_connectivity_plugins.hpp:773
auto sendBatchTowardsHost(const IT &begin, const IT &end) -> typename std::enable_if< !std::is_const< ampl::remove_ref_t< decltype(*begin)>>::value &&l::is_same< Message, ampl::remove_const_t< ampl::remove_ref_t< decltype(*begin)>>>::value, void >::type
Overload for sending messages using an iterator range.
Definition: sag_connectivity_plugins.hpp:634
map_t copy() const
Return a deep copy of this map.
Definition: sag_connectivity_cpp.hpp:242
virtual void deliverNullPayloadTowardsTransport(Message &msg)
Deliver a message with a null payload.
Definition: sag_connectivity_plugins.hpp:912
void enableReliability(Direction direction)
Enable reliable messaging for the chain that this plug-in belongs to, in a particular direction i....
Definition: sag_connectivity_plugins.hpp:103
Contains the headers needed to implement your own Connectivity Plugins.
const std::string & getChainId() const
Get the identifier used for the chain this plug-in is part of.
Definition: sag_connectivity_plugins.hpp:175
A container for parameters passed to the constructor of a transport plug-in.
Definition: sag_connectivity_plugins.hpp:201
The direction of messages flowing towards the host (from the transport).
Interface to support miscellaneous requests from a particular plug-in to the host system.
Definition: sag_connectivity_plugins.hpp:84
A base interface for parameters passed to the constructor of transport or codec plug-ins.
Definition: sag_connectivity_plugins.hpp:156
virtual bool handleException(Message &m, bool towardsTransport)
Handle an exception thrown while delivering a message.
Definition: sag_connectivity_plugins.hpp:1085
utf8-encoded const char*
Definition: sag_connectivity_c.h:45
const std::string chainId
The identifier used for the chain this plug-in is part of.
Definition: sag_connectivity_plugins.hpp:550
The direction of messages flowing towards the transport (from the host).
void sendBatchTowardsTransport(Message &&message)
Overload for sending a batch containing a single message.
Definition: sag_connectivity_plugins.hpp:699
std::string getStatus()
Return the value this status item was set to most recently by this class.
Definition: sag_connectivity_plugins.hpp:312
A variant type which can be one of the following:
Definition: sag_connectivity_cpp.hpp:41
std::unique_ptr< HostSide > ptr_t
Pointers to HostSides should always be this ptr_t type, which is a std::unique_ptr.
Definition: sag_connectivity_plugins.hpp:590
virtual void sendBatchTowardsTransport(Message *start, Message *end)
Implements batch sending, calling transformMessageTowardsTransport(Message &) for each message indivi...
Definition: sag_connectivity_plugins.hpp:1012
const map_t & getConfig() const
Get the configuration for this plug-in.
Definition: sag_connectivity_plugins.hpp:170
virtual ~Plugin()
This destructor must be virtual.
Definition: sag_connectivity_plugins.hpp:494
HostSide::ptr_t hostSide
The next plug-in in the chain towards host.
Definition: sag_connectivity_plugins.hpp:859
virtual bool transformNullPayloadTowardsHost(Message &msg)
Transform a message with a null payload in a host-wards direction.
Definition: sag_connectivity_plugins.hpp:1050