Skip to content

Commit a3ba1d8

Browse files
committed
try another thing
1 parent bcdf8fb commit a3ba1d8

2 files changed

Lines changed: 119 additions & 142 deletions

File tree

include/irods/private/amqp_sender.hpp

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@
55

66
#include <irods/irods_error.hpp>
77

8+
#include <chrono>
89
#include <cstdint>
910
#include <condition_variable>
1011
#include <fstream>
1112
#include <mutex>
1213
#include <optional>
14+
#include <semaphore>
15+
#include <shared_mutex>
1316
#include <string>
1417
#include <thread>
1518
#include <vector>
@@ -62,33 +65,28 @@ namespace irods::plugin::rule_engine::audit_amqp
6265
std::ofstream& test_log_ofstream);
6366

6467
void on_container_start(proton::container& container) override;
65-
void on_connection_open(proton::connection& connection) override;
66-
void on_sender_open(proton::sender& sender) override;
68+
void on_container_stop(proton::container& container) override;
69+
void on_transport_close(proton::transport& transport) override;
6770
void on_transport_error(proton::transport& transport) override;
71+
void on_connection_open(proton::connection& connection) override;
72+
void on_connection_close(proton::connection& connection) override;
6873
void on_connection_error(proton::connection& connection) override;
74+
void on_session_close(proton::session& session) override;
6975
void on_session_error(proton::session& session) override;
70-
void on_receiver_error(proton::receiver& receiver) override;
76+
void on_sender_open(proton::sender& sender) override;
77+
void on_sender_close(proton::sender& sender) override;
7178
void on_sender_error(proton::sender& sender) override;
79+
void on_tracker_accept(proton::tracker& tracker) override;
7280
void on_tracker_reject(proton::tracker& tracker) override;
81+
void on_tracker_release(proton::tracker& tracker) override;
82+
void on_tracker_settle(proton::tracker& tracker) override;
7383
void on_error(const proton::error_condition& err_cond) override;
7484

7585
#ifdef IRODS_AUDIT_EXTRA_TRACE
76-
void on_container_stop(proton::container& container) override;
7786
void on_sendable(proton::sender& sender) override;
7887
void on_transport_open(proton::transport& transport) override;
79-
void on_transport_close(proton::transport& transport) override;
80-
void on_connection_close(proton::connection& connection) override;
8188
void on_session_open(proton::session& session) override;
82-
void on_session_close(proton::session& session) override;
83-
void on_receiver_open(proton::receiver& receiver) override;
84-
void on_receiver_detach(proton::receiver& receiver) override;
85-
void on_receiver_close(proton::receiver& receiver) override;
8689
void on_sender_detach(proton::sender& sender) override;
87-
void on_sender_close(proton::sender& sender) override;
88-
void on_tracker_accept(proton::tracker& tracker) override;
89-
void on_tracker_release(proton::tracker& tracker) override;
90-
void on_tracker_settle(proton::tracker& tracker) override;
91-
void on_delivery_settle(proton::delivery& delivery) override;
9290
void on_sender_drain_start(proton::sender& sender) override;
9391
void on_receiver_drain_finish(proton::receiver& receiver) override;
9492
void on_connection_wake(proton::connection& connection) override;
@@ -111,12 +109,16 @@ namespace irods::plugin::rule_engine::audit_amqp
111109
std::optional<enum proton::target::durability_mode> _sender_durability_mode;
112110
std::optional<bool> _durable_messages;
113111

114-
std::mutex _proton_mutex;
115112
std::optional<std::thread> _proton_thread;
116113

117-
std::mutex _amqp_connection_mutex;
114+
std::mutex _amqp_send_mutex;
115+
116+
std::shared_mutex _amqp_connection_cv_mutex;
118117
std::condition_variable_any _amqp_connection_cv;
119-
bool _amqp_connection_mutex_locked;
118+
std::binary_semaphore _connection_semaphore;
119+
120+
std::shared_mutex _amqp_send_cv_mutex;
121+
std::condition_variable_any _amqp_send_cv;
120122

121123
std::optional<proton::container> _container;
122124
std::optional<proton::connection> _connection;

0 commit comments

Comments
 (0)