From 5317bbec0ee963b97da48e7df00cf952b547ba5a Mon Sep 17 00:00:00 2001 From: "Markus Kitsinger (SwooshyCueb)" Date: Thu, 17 Nov 2022 17:08:37 -0500 Subject: [PATCH] [WIP] HA AMQP --- include/irods/private/amqp_sender.hpp | 61 ++++++- src/amqp_sender.cpp | 85 ++++++++- src/main.cpp | 252 ++++++++++++++++++++------ 3 files changed, 334 insertions(+), 64 deletions(-) diff --git a/include/irods/private/amqp_sender.hpp b/include/irods/private/amqp_sender.hpp index f41ba25fb..f98ceeb05 100644 --- a/include/irods/private/amqp_sender.hpp +++ b/include/irods/private/amqp_sender.hpp @@ -3,7 +3,11 @@ #include "irods/private/audit_amqp.hpp" +#include +#include #include +#include +#include #include #include @@ -16,12 +20,49 @@ #include #include +#include +#include +#include + +// filesystem +// clang-format off +#ifdef __cpp_lib_filesystem +#include +namespace fs = std::filesystem; +#else +#include +namespace fs = boost::filesystem; +#endif +// clang-format on + namespace irods::plugin::rule_engine::audit_amqp { + class amqp_endpoint + { + public: + amqp_endpoint( + const std::string_view& _scheme, + const std::string_view& _host, + const std::uint_fast16_t& _port, + const std::string_view& _topic, + const std::string_view& _user, + const std::string_view& _password); + + const std::string url; + const std::string user; + const std::string password; + + static constexpr const std::string_view default_scheme{"amqp"}; + static constexpr const std::uint_fast16_t default_amqp_port = 5672; + static constexpr const std::uint_fast16_t default_amqps_port = 5671; + static constexpr const std::string_view default_user{""}; + static constexpr const std::string_view default_password{""}; + }; + class send_handler : public proton::messaging_handler { public: - send_handler(const proton::message& _message, const std::string& _url); + send_handler(const proton::message& _message, const std::vector& _endpoints); void on_container_start(proton::container& _container) override; void on_sendable(proton::sender& _sender) override; void on_tracker_accept(proton::tracker& _tracker) override; @@ -34,10 +75,26 @@ namespace irods::plugin::rule_engine::audit_amqp void on_error(const proton::error_condition& _err_cond) override; private: - const std::string& _amqp_url; + const std::vector& _endpoints; + std::vector::size_type _endpoint_idx; const proton::message& _message; bool _message_sent; }; + + class amqp_sender + { + public: + amqp_sender(const std::vector& _endpoints); + ~amqp_sender(); + void enable_test_mode(const fs::path _test_mode_log_path); + void disable_test_mode(); + void send_message(const nlohmann::json& _message_body, const std::uint64_t _timestamp_ms); + private: + const std::vector& _endpoints; + std::vector::size_type _endpoint_idx; + fs::path _log_file_path; + std::ofstream _log_file_ofstream; + }; } //namespace irods::plugin::rule_engine::audit_amqp #endif // IRODS_AUDIT_AMQP_SENDER_HPP diff --git a/src/amqp_sender.cpp b/src/amqp_sender.cpp index d7254bcfe..1c4b23e44 100644 --- a/src/amqp_sender.cpp +++ b/src/amqp_sender.cpp @@ -2,11 +2,23 @@ #include "irods/private/amqp_sender.hpp" #include +#include +#include namespace irods::plugin::rule_engine::audit_amqp { namespace { + BOOST_FORCEINLINE std::string build_amqp_url( + const std::string_view& _scheme, + const std::string_view& _host, + const std::uint_fast16_t& _port, + const std::string_view& _topic) + { + // TODO: use Boost.URL (if available at compile-time) to validate URLs + return fmt::format(FMT_COMPILE("{0:s}://{1:s}:{2:d}/{3:s}"), _scheme, _host, _port, _topic); + } + BOOST_FORCEINLINE void log_proton_error(const proton::error_condition& err_cond, const std::string& log_message) { // clang-format off @@ -21,8 +33,68 @@ namespace irods::plugin::rule_engine::audit_amqp } } // namespace - send_handler::send_handler(const proton::message& _message, const std::string& _url) - : _amqp_url(_url) + amqp_endpoint::amqp_endpoint( + const std::string_view& _scheme, + const std::string_view& _host, + const std::uint_fast16_t& _port, + const std::string_view& _topic, + const std::string_view& _user, + const std::string_view& _password) + : url(build_amqp_url(_scheme, _host, _port, _topic)) + , user(_user) + , password(_password) + { + } + + amqp_sender::amqp_sender(const std::vector& _endpoints) + : _endpoints(_endpoints), _endpoint_idx(static_cast::size_type>(0)) + { + } + + amqp_sender::~amqp_sender() + { + if (_log_file_ofstream.is_open()) { + _log_file_ofstream.close(); + } + } + + void amqp_sender::enable_test_mode(const fs::path _test_mode_log_path) + { + if (_log_file_ofstream.is_open()) { + _log_file_ofstream.close(); + } + _log_file_path.assign(_test_mode_log_path); + } + + void amqp_sender::disable_test_mode() + { + _log_file_path.clear(); + if (_log_file_ofstream.is_open()) { + _log_file_ofstream.close(); + } + } + + void amqp_sender::send_message(const nlohmann::json& _message_body, const std::uint64_t _timestamp_ms) + { + const std::string msg_str = _message_body.dump(); + + proton::message msg(msg_str); + msg.content_type("application/json"); + msg.creation_time(proton::timestamp(static_cast(_timestamp_ms))); + send_handler handler(msg, _endpoints); + proton::container(handler).run(); + + if (!_log_file_path.empty()) { + if (!_log_file_ofstream.is_open()) { + _log_file_ofstream.open(_log_file_path); + } + _log_file_ofstream << msg_str << std::endl; + } + } + + send_handler::send_handler(const proton::message& _message, const std::vector& _endpoints) + : _endpoints(_endpoints) + , _endpoint_idx(static_cast::size_type>(0)) , _message(_message) , _message_sent(false) { @@ -30,8 +102,15 @@ namespace irods::plugin::rule_engine::audit_amqp void send_handler::on_container_start(proton::container& _container) { + const amqp_endpoint& endpoint = _endpoints[_endpoint_idx]; proton::connection_options conn_opts; - _container.open_sender(_amqp_url, conn_opts); + if (!endpoint.user.empty()) { + conn_opts.user(endpoint.user); + } + if (!endpoint.password.empty()) { + conn_opts.password(endpoint.password); + } + _container.open_sender(endpoint.url, conn_opts); } void send_handler::on_sendable(proton::sender& _sender) diff --git a/src/main.cpp b/src/main.cpp index c810409b0..80bb5884f 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -2,10 +2,12 @@ #include "irods/private/audit_amqp.hpp" #include "irods/private/audit_b64enc.hpp" #include "irods/private/amqp_sender.hpp" +#include #include #include #include #include +#include // LIST is #defined in irods/reconstants.hpp // and is an enum entry in proton/type_id.hpp @@ -33,6 +35,7 @@ #include #include #include +#include // misc includes #include @@ -42,13 +45,11 @@ // stl includes #include #include -#include #include #include #include #include #include -#include #include #include @@ -71,26 +72,34 @@ namespace irods::plugin::rule_engine::audit_amqp // NOLINTBEGIN(cert-err58-cpp, cppcoreguidelines-avoid-non-const-global-variables) const std::string_view default_pep_regex_to_match{"audit_.*"}; - const std::string_view default_amqp_url{"localhost:5672/irods_audit_messages"}; + const amqp_endpoint default_amqp_endpoint( + amqp_endpoint::default_scheme, + "localhost", + amqp_endpoint::default_amqp_port, + "irods_audit_messages", + "ANONYMOUS", + ""); const fs::path default_log_path_prefix{fs::temp_directory_path()}; const bool default_test_mode = false; std::string audit_pep_regex_to_match; - std::string audit_amqp_url; + std::vector amqp_endpoints; fs::path log_path_prefix; bool test_mode; bool warned_amqp_options = false; + bool warned_amqp_location = false; fs::path log_file_path; - std::ofstream log_file_ofstream; // audit_pep_regex is initially populated with an unoptimized default, as optimization // makes construction slower, and we don't expect it to be used before configuration is read. std::regex audit_pep_regex{audit_pep_regex_to_match, pep_regex_flavor}; + amqp_sender message_sender(amqp_endpoints); + std::mutex audit_plugin_mutex; // NOLINTEND(cert-err58-cpp, cppcoreguidelines-avoid-non-const-global-variables) } // namespace @@ -98,7 +107,8 @@ namespace irods::plugin::rule_engine::audit_amqp static BOOST_FORCEINLINE void set_default_configs() { audit_pep_regex_to_match = default_pep_regex_to_match; - audit_amqp_url = default_amqp_url; + amqp_endpoints.clear(); + amqp_endpoints.push_back(default_amqp_endpoint); test_mode = default_test_mode; log_path_prefix = default_log_path_prefix; @@ -133,9 +143,175 @@ namespace irods::plugin::rule_engine::audit_amqp audit_pep_regex_to_match = plugin_spec_cfg.at("pep_regex_to_match").get(); - const auto& amqp_topic = plugin_spec_cfg.at("amqp_topic").get_ref(); - const auto& amqp_location = plugin_spec_cfg.at("amqp_location").get_ref(); - audit_amqp_url = fmt::format(FMT_STRING("{0:s}/{1:s}"), amqp_location, amqp_topic); + std::vector amqp_endpoints_val; + const auto amqp_endpoints_cfg = plugin_spec_cfg.find("amqp_endpoints"); + if (amqp_endpoints_cfg != plugin_spec_cfg.end()) { + const auto& amqp_endpoints = *amqp_endpoints_cfg; + for (const auto& endpoint_cfg : amqp_endpoints) { + const auto scheme_cfg = endpoint_cfg.find("scheme"); + // clang-format off + const auto& scheme = scheme_cfg == endpoint_cfg.end() + ? amqp_endpoint::default_scheme + : scheme_cfg->get_ref(); + // clang-format on + + const auto& host = endpoint_cfg.at("host").get_ref(); + + const auto port_cfg = endpoint_cfg.find("port"); + std::uint_fast16_t port; + if (port_cfg == endpoint_cfg.end()) { + if (scheme == "amqp") { + port = amqp_endpoint::default_amqp_port; + } + else if (scheme == "amqps") { + port = amqp_endpoint::default_amqps_port; + } + else { + // clang-format off + log_re::error({ + {"rule_engine_plugin", rule_engine_name}, + {"instance_name", _instance_name}, + {"log_message", "Cannot derive port number from scheme."}, + {"scheme", std::string(scheme)}, + }); + // clang-format on + THROW(SYS_CONFIG_FILE_ERR, "Cannot derive port number for AMQP endpoint."); + } + } + else { + const auto& port_val = port_cfg->get_ref(); + if (port_val > UINT16_MAX) { + // clang-format off + log_re::error({ + {"rule_engine_plugin", rule_engine_name}, + {"instance_name", _instance_name}, + {"log_message", "port must not exceed 65535."}, + {"port", std::to_string(port_val)} + }); + // clang-format on + THROW(SYS_CONFIG_FILE_ERR, "AMQP endpoint port greater than 65535."); + } + port = static_cast(port_val); + } + + const auto& topic = endpoint_cfg.at("topic").get_ref(); + + const auto user_cfg = endpoint_cfg.find("user"); + // clang-format off + const auto& user = user_cfg == endpoint_cfg.end() + ? amqp_endpoint::default_user + : user_cfg->get_ref(); + // clang-format on + + const auto password_cfg = endpoint_cfg.find("password"); + // clang-format off + const auto& password = password_cfg == endpoint_cfg.end() + ? amqp_endpoint::default_password + : password_cfg->get_ref(); + // clang-format on + + amqp_endpoints_val.emplace_back(scheme, host, port, topic, user, password); + } + } + + // if we didn't read anything from amqp_endpoints, check amqp_location and amqp_topic + const auto amqp_location_cfg = plugin_spec_cfg.find("amqp_location"); + const auto amqp_topic_cfg = plugin_spec_cfg.find("amqp_topic"); + if (amqp_endpoints_val.empty()) { + if (amqp_location_cfg == plugin_spec_cfg.end() || amqp_topic_cfg == plugin_spec_cfg.end()) { + // clang-format off + log_re::error({ + {"rule_engine_plugin", rule_engine_name}, + {"instance_name", _instance_name}, + {"log_message", "amqp_endpoints not present in rule engine configuration or empty."} + }); + // clang-format on + THROW(KEY_NOT_FOUND, "amqp_endpoints not present in rule engine configuration or empty."); + } + else if (!warned_amqp_location) { + // clang-format off + log_re::warn({ + {"rule_engine_plugin", rule_engine_name}, + {"instance_name", _instance_name}, + {"log_message", "Found amqp_location and/or amqp_topic configuration settings. These " + "settings have been deprecated in favor of amqp_endpoints and will be " + "ignored in future versions of the plugin."} + }); + // clang-format on + warned_amqp_location = true; + } + const auto& amqp_location = amqp_location_cfg->get_ref(); + const auto& amqp_topic = amqp_topic_cfg->get_ref(); + + const auto amqp_url_str = fmt::format(FMT_COMPILE("{0:s}/{1:s}"), amqp_location, amqp_topic); + // TODO: Use Boost.URL instead, once it's available +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" + const proton::url amqp_url(amqp_url_str, false); +#pragma GCC diagnostic pop + std::string scheme = amqp_url.scheme(); + if (scheme.empty()) { + scheme = amqp_endpoint::default_scheme; + } + std::string host = amqp_url.host(); + if (host.empty()) { + // clang-format off + log_re::error({ + {"rule_engine_plugin", rule_engine_name}, + {"instance_name", _instance_name}, + {"log_message", "Could not get host from amqp_location"}, + {"amqp_location", amqp_location}, + }); + // clang-format on + THROW(SYS_CONFIG_FILE_ERR, "Cannot derive AMQP endpoint host from amqp_location."); + } + std::uint_fast16_t port; + if (amqp_url.port().empty()) { + if (scheme == "amqp") { + port = amqp_endpoint::default_amqp_port; + } + else if (scheme == "amqps") { + port = amqp_endpoint::default_amqps_port; + } + else { + // clang-format off + log_re::error({ + {"rule_engine_plugin", rule_engine_name}, + {"instance_name", _instance_name}, + {"log_message", "Cannot derive port number from scheme."}, + {"scheme", scheme}, + }); + // clang-format on + THROW(SYS_CONFIG_FILE_ERR, "Cannot derive port number for AMQP endpoint."); + } + } + else { + port = static_cast(amqp_url.port_int()); + } + + amqp_endpoints_val.emplace_back( + scheme, + host, + port, + amqp_topic, + amqp_url.user(), + amqp_url.password()); + } + else if (amqp_location_cfg != plugin_spec_cfg.end() || amqp_topic_cfg != plugin_spec_cfg.end()) { + if (!warned_amqp_location) { + // clang-format off + log_re::warn({ + {"rule_engine_plugin", rule_engine_name}, + {"instance_name", _instance_name}, + {"log_message", "Ignoring amqp_location and amqp_topic in favor of amqp_endpoints. These " + "settings have been deprecated and should be removed from the plugin ." + "configuration."} + }); + // clang-format on + warned_amqp_location = true; + } + } + std::swap(amqp_endpoints, amqp_endpoints_val); // test_mode is optional const auto test_mode_cfg = plugin_spec_cfg.find("test_mode"); @@ -210,8 +386,6 @@ namespace irods::plugin::rule_engine::audit_amqp nlohmann::json json_obj; - std::string msg_str; - try { std::uint64_t time_ms = ts_clock::now().time_since_epoch() / std::chrono::milliseconds(1); json_obj["@timestamp"] = time_ms; @@ -225,15 +399,13 @@ namespace irods::plugin::rule_engine::audit_amqp if (test_mode) { log_file_path = log_path_prefix / fmt::format(FMT_STRING("{0:08d}.txt"), pid); json_obj["log_file"] = log_file_path; + message_sender.enable_test_mode(log_file_path); + } + else { + message_sender.disable_test_mode(); } - msg_str = json_obj.dump(); - - proton::message msg(msg_str); - msg.content_type("application/json"); - msg.creation_time(proton::timestamp(static_cast(time_ms))); - send_handler handler(msg, audit_amqp_url); - proton::container(handler).run(); + message_sender.send_message(json_obj, time_ms); } catch (const irods::exception& e) { log_exception(e, "Caught iRODS exception", {"instance_name", _instance_name}); @@ -251,13 +423,6 @@ namespace irods::plugin::rule_engine::audit_amqp return ERROR(SYS_UNKNOWN_ERROR, "an unknown error occurred"); } - if (test_mode) { - if (!log_file_ofstream.is_open()) { - log_file_ofstream.open(log_file_path); - } - log_file_ofstream << msg_str << std::endl; - } - return SUCCESS(); } @@ -267,9 +432,6 @@ namespace irods::plugin::rule_engine::audit_amqp nlohmann::json json_obj; - std::string msg_str; - std::string log_file; - try { std::uint64_t time_ms = ts_clock::now().time_since_epoch() / std::chrono::milliseconds(1); json_obj["@timestamp"] = time_ms; @@ -282,13 +444,8 @@ namespace irods::plugin::rule_engine::audit_amqp json_obj["log_file"] = log_file_path; } - msg_str = json_obj.dump(); - - proton::message msg(msg_str); - msg.content_type("application/json"); - msg.creation_time(proton::timestamp(static_cast(time_ms))); - send_handler handler(msg, audit_amqp_url); - proton::container(handler).run(); + message_sender.send_message(json_obj, time_ms); + message_sender.disable_test_mode(); } catch (const irods::exception& e) { log_exception(e, "Caught iRODS exception", {"instance_name", _instance_name}); @@ -306,14 +463,6 @@ namespace irods::plugin::rule_engine::audit_amqp return ERROR(SYS_UNKNOWN_ERROR, "an unknown error occurred"); } - if (test_mode) { - if (!log_file_ofstream.is_open()) { - log_file_ofstream.open(log_file_path); - } - log_file_ofstream << msg_str << std::endl; - log_file_ofstream.close(); - } - return SUCCESS(); } @@ -360,8 +509,6 @@ namespace irods::plugin::rule_engine::audit_amqp nlohmann::json json_obj; - std::string msg_str; - std::string log_file; try { std::uint64_t time_ms = ts_clock::now().time_since_epoch() / std::chrono::milliseconds(1); @@ -424,13 +571,7 @@ namespace irods::plugin::rule_engine::audit_amqp } } - msg_str = json_obj.dump(); - - proton::message msg(msg_str); - msg.content_type("application/json"); - msg.creation_time(proton::timestamp(static_cast(time_ms))); - send_handler handler(msg, audit_amqp_url); - proton::container(handler).run(); + message_sender.send_message(json_obj, time_ms); } catch (const irods::exception& e) { log_exception(e, "Caught iRODS exception", {"rule_name", _rn}); @@ -448,13 +589,6 @@ namespace irods::plugin::rule_engine::audit_amqp return ERROR(SYS_UNKNOWN_ERROR, "an unknown error occurred"); } - if (test_mode) { - if (!log_file_ofstream.is_open()) { - log_file_ofstream.open(log_file_path); - } - log_file_ofstream << msg_str << std::endl; - } - return err; } } // namespace irods::plugin::rule_engine::audit_amqp