diff --git a/lightning-liquidity/src/lsps2/payment_queue.rs b/lightning-liquidity/src/lsps2/payment_queue.rs index 421e42d7706..600f588716c 100644 --- a/lightning-liquidity/src/lsps2/payment_queue.rs +++ b/lightning-liquidity/src/lsps2/payment_queue.rs @@ -26,21 +26,29 @@ impl PaymentQueue { PaymentQueue { payments: Vec::new() } } + fn payment_status(entry: &PaymentQueueEntry) -> (u64, usize) { + let total_expected_outbound_amount_msat = + entry.htlcs.iter().map(|htlc| htlc.expected_outbound_amount_msat).sum(); + (total_expected_outbound_amount_msat, entry.htlcs.len()) + } + pub(crate) fn add_htlc(&mut self, new_htlc: InterceptedHTLC) -> (u64, usize) { + if let Some(entry) = self + .payments + .iter() + .find(|entry| entry.htlcs.iter().any(|htlc| htlc.intercept_id == new_htlc.intercept_id)) + { + debug_assert_eq!(entry.payment_hash, new_htlc.payment_hash); + return Self::payment_status(entry); + } + let payment = self.payments.iter_mut().find(|entry| entry.payment_hash == new_htlc.payment_hash); if let Some(entry) = payment { // HTLCs within a payment should have the same payment hash. debug_assert!(entry.htlcs.iter().all(|htlc| htlc.payment_hash == entry.payment_hash)); - // The given HTLC should not already be present. - debug_assert!(entry - .htlcs - .iter() - .all(|htlc| htlc.intercept_id != new_htlc.intercept_id)); entry.htlcs.push(new_htlc); - let total_expected_outbound_amount_msat = - entry.htlcs.iter().map(|htlc| htlc.expected_outbound_amount_msat).sum(); - (total_expected_outbound_amount_msat, entry.htlcs.len()) + Self::payment_status(entry) } else { let expected_outbound_amount_msat = new_htlc.expected_outbound_amount_msat; let entry = @@ -127,6 +135,15 @@ mod tests { (500_000_000, 2), ); + assert_eq!( + payment_queue.add_htlc(InterceptedHTLC { + intercept_id: InterceptId([2; 32]), + expected_outbound_amount_msat: 300_000_000, + payment_hash: PaymentHash([100; 32]), + }), + (500_000_000, 2), + ); + let expected_entry = PaymentQueueEntry { payment_hash: PaymentHash([100; 32]), htlcs: vec![ diff --git a/lightning-liquidity/src/lsps2/service.rs b/lightning-liquidity/src/lsps2/service.rs index 5f318fc077e..e1373cfb18b 100644 --- a/lightning-liquidity/src/lsps2/service.rs +++ b/lightning-liquidity/src/lsps2/service.rs @@ -644,6 +644,26 @@ impl PeerState { }); } + fn remove_terminal_channel_state(&mut self, channel_id: ChannelId) -> Option { + let intercept_scid = self.intercept_scid_by_channel_id.get(&channel_id).copied()?; + let should_remove = self + .outbound_channels_by_intercept_scid + .get(&intercept_scid) + .and_then(|entry| entry.get_channel_id()) + .is_some_and(|existing_channel_id| existing_channel_id == channel_id); + + if !should_remove { + return None; + } + + self.outbound_channels_by_intercept_scid.remove(&intercept_scid); + self.intercept_scid_by_channel_id.remove(&channel_id); + self.intercept_scid_by_user_channel_id.retain(|_, iscid| *iscid != intercept_scid); + self.needs_persist = true; + + Some(intercept_scid) + } + fn pending_requests_and_channels(&self) -> usize { let pending_requests = self.pending_requests.len(); let pending_outbound_channels = self @@ -1252,6 +1272,43 @@ where Ok(()) } + /// Forward [`Event::ChannelClosed`] event parameter into this function. + /// + /// Will prune terminal JIT channel state once the corresponding channel has closed. + /// + /// [`Event::ChannelClosed`]: lightning::events::Event::ChannelClosed + pub async fn channel_closed(&self, channel_id: ChannelId) -> Result<(), APIError> { + let counterparty_node_id = + self.peer_by_channel_id.read().unwrap().get(&channel_id).copied(); + let Some(counterparty_node_id) = counterparty_node_id else { + return Ok(()); + }; + + let removed_intercept_scid = { + let outer_state_lock = self.per_peer_state.read().unwrap(); + match outer_state_lock.get(&counterparty_node_id) { + Some(inner_state_lock) => { + let mut peer_state = inner_state_lock.lock().unwrap(); + peer_state.remove_terminal_channel_state(channel_id) + }, + None => None, + } + }; + + if let Some(intercept_scid) = removed_intercept_scid { + self.peer_by_intercept_scid.write().unwrap().remove(&intercept_scid); + self.peer_by_channel_id.write().unwrap().remove(&channel_id); + self.persist().await.map_err(|e| APIError::APIMisuseError { + err: format!( + "Failed to persist peer state after channel {} closed: {}", + channel_id, e + ), + })?; + } + + Ok(()) + } + /// Abandons a pending JIT‐open flow for `user_channel_id`, removing all local state. /// /// This removes the intercept SCID, any outbound channel state, and associated @@ -2270,6 +2327,25 @@ where } } + /// Forward [`Event::ChannelClosed`] event parameter into this function. + /// + /// Wraps [`LSPS2ServiceHandler::channel_closed`]. + /// + /// [`Event::ChannelClosed`]: lightning::events::Event::ChannelClosed + pub fn channel_closed(&self, channel_id: ChannelId) -> Result<(), APIError> { + let mut fut = pin!(self.inner.channel_closed(channel_id)); + + let mut waker = dummy_waker(); + let mut ctx = task::Context::from_waker(&mut waker); + match fut.as_mut().poll(&mut ctx) { + task::Poll::Ready(result) => result, + task::Poll::Pending => { + // In a sync context, we can't wait for the future to complete. + unreachable!("Should not be pending in a sync context"); + }, + } + } + /// Wraps [`LSPS2ServiceHandler::channel_needs_manual_broadcast`]. pub fn channel_needs_manual_broadcast( &self, user_channel_id: u128, counterparty_node_id: &PublicKey, @@ -2361,6 +2437,8 @@ mod tests { use bitcoin::{absolute::LockTime, transaction::Version}; use core::str::FromStr; + use lightning::io::Cursor; + use lightning::util::ser::{Readable, Writeable}; const MAX_VALUE_MSAT: u64 = 21_000_000_0000_0000_000; @@ -2764,6 +2842,118 @@ mod tests { } } + #[test] + fn replayed_intercepted_htlc_after_persist_is_idempotent() { + let payment_size_msat = Some(500_000_000); + let opening_fee_params = LSPS2OpeningFeeParams { + min_fee_msat: 10_000_000, + proportional: 10_000, + valid_until: LSPSDateTime::from_str("2035-05-20T08:30:45Z").unwrap(), + min_lifetime: 4032, + max_client_to_self_delay: 2016, + min_payment_size_msat: 10_000_000, + max_payment_size_msat: 1_000_000_000, + promise: "ignore".to_string(), + }; + let intercept_scid = 42; + let user_channel_id = 43; + let htlc = InterceptedHTLC { + intercept_id: InterceptId([1; 32]), + expected_outbound_amount_msat: 500_000_000, + payment_hash: PaymentHash([2; 32]), + }; + + let mut jit_channel = + OutboundJITChannel::new(payment_size_msat, opening_fee_params, user_channel_id, false); + assert!(matches!( + jit_channel.htlc_intercepted(htlc).unwrap(), + Some(HTLCInterceptedAction::OpenChannel(_)) + )); + + let mut peer_state = PeerState::new(); + peer_state.intercept_scid_by_user_channel_id.insert(user_channel_id, intercept_scid); + peer_state.insert_outbound_channel(intercept_scid, jit_channel); + + let encoded_peer_state = peer_state.encode(); + let mut decoded_peer_state = PeerState::read(&mut Cursor::new(encoded_peer_state)).unwrap(); + let decoded_jit_channel = decoded_peer_state + .outbound_channels_by_intercept_scid + .get_mut(&intercept_scid) + .unwrap(); + + assert!(decoded_jit_channel.htlc_intercepted(htlc).unwrap().is_none()); + + let ForwardPaymentAction(_, fee_payment) = + decoded_jit_channel.channel_ready(ChannelId([3; 32])).unwrap(); + assert_eq!(fee_payment.htlcs, vec![htlc]); + } + + #[test] + fn removes_terminal_state_for_closed_channel() { + let opening_fee_params = LSPS2OpeningFeeParams { + min_fee_msat: 10_000_000, + proportional: 10_000, + valid_until: LSPSDateTime::from_str("2035-05-20T08:30:45Z").unwrap(), + min_lifetime: 4032, + max_client_to_self_delay: 2016, + min_payment_size_msat: 10_000_000, + max_payment_size_msat: 1_000_000_000, + promise: "ignore".to_string(), + }; + let stale_intercept_scid = 42; + let stale_user_channel_id = 43; + let stale_channel_id = ChannelId([44; 32]); + let live_intercept_scid = 45; + let live_user_channel_id = 46; + let live_channel_id = ChannelId([47; 32]); + + let mut stale_jit_channel = + OutboundJITChannel::new(None, opening_fee_params.clone(), stale_user_channel_id, false); + stale_jit_channel.state = + OutboundJITChannelState::PaymentForwarded { channel_id: stale_channel_id }; + let mut live_jit_channel = + OutboundJITChannel::new(None, opening_fee_params, live_user_channel_id, false); + live_jit_channel.state = + OutboundJITChannelState::PaymentForwarded { channel_id: live_channel_id }; + + let mut peer_state = PeerState::new(); + peer_state.insert_outbound_channel(stale_intercept_scid, stale_jit_channel); + peer_state.insert_outbound_channel(live_intercept_scid, live_jit_channel); + peer_state + .intercept_scid_by_user_channel_id + .insert(stale_user_channel_id, stale_intercept_scid); + peer_state + .intercept_scid_by_user_channel_id + .insert(live_user_channel_id, live_intercept_scid); + peer_state.intercept_scid_by_channel_id.insert(stale_channel_id, stale_intercept_scid); + peer_state.intercept_scid_by_channel_id.insert(live_channel_id, live_intercept_scid); + peer_state.needs_persist = false; + + assert_eq!( + peer_state.remove_terminal_channel_state(stale_channel_id), + Some(stale_intercept_scid) + ); + assert!(!peer_state + .outbound_channels_by_intercept_scid + .contains_key(&stale_intercept_scid)); + assert!(peer_state.outbound_channels_by_intercept_scid.contains_key(&live_intercept_scid)); + assert!(!peer_state.intercept_scid_by_user_channel_id.contains_key(&stale_user_channel_id)); + assert_eq!( + peer_state.intercept_scid_by_user_channel_id.get(&live_user_channel_id), + Some(&live_intercept_scid) + ); + assert!(!peer_state.intercept_scid_by_channel_id.contains_key(&stale_channel_id)); + assert_eq!( + peer_state.intercept_scid_by_channel_id.get(&live_channel_id), + Some(&live_intercept_scid) + ); + assert!(peer_state.needs_persist); + + peer_state.needs_persist = false; + assert_eq!(peer_state.remove_terminal_channel_state(stale_channel_id), None); + assert!(!peer_state.needs_persist); + } + #[test] fn broadcast_not_allowed_after_non_paying_fee_payment_claimed() { let min_fee_msat: u64 = 12345; diff --git a/lightning-liquidity/src/manager.rs b/lightning-liquidity/src/manager.rs index f1b098dbfaa..9accd9e5769 100644 --- a/lightning-liquidity/src/manager.rs +++ b/lightning-liquidity/src/manager.rs @@ -256,6 +256,7 @@ where /// - [`Event::ChannelReady`] to [`LSPS2ServiceHandler::channel_ready`] /// - [`Event::HTLCHandlingFailed`] to [`LSPS2ServiceHandler::htlc_handling_failed`] /// - [`Event::PaymentForwarded`] to [`LSPS2ServiceHandler::payment_forwarded`] +/// - [`Event::ChannelClosed`] to [`LSPS2ServiceHandler::channel_closed`] /// /// [`PeerManager`]: lightning::ln::peer_handler::PeerManager /// [`MessageHandler`]: lightning::ln::peer_handler::MessageHandler @@ -263,6 +264,7 @@ where /// [`Event::ChannelReady`]: lightning::events::Event::ChannelReady /// [`Event::HTLCHandlingFailed`]: lightning::events::Event::HTLCHandlingFailed /// [`Event::PaymentForwarded`]: lightning::events::Event::PaymentForwarded +/// [`Event::ChannelClosed`]: lightning::events::Event::ChannelClosed pub struct LiquidityManager< ES: EntropySource + Clone, NS: NodeSigner + Clone,