From 76eac43bbfd5a0427dba810b3cca8776a8174a9a Mon Sep 17 00:00:00 2001 From: tnull Date: Tue, 2 Jun 2026 14:20:33 +0200 Subject: [PATCH 1/3] Add LSPS2 replay regression coverage Persisting LSPS2 service state can race with replayed intercepted HTLC events after restart. Cover replaying the same intercepted HTLC after restoring peer state so duplicate queueing is caught. Co-Authored-By: HAL 9000 --- lightning-liquidity/src/lsps2/service.rs | 48 ++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/lightning-liquidity/src/lsps2/service.rs b/lightning-liquidity/src/lsps2/service.rs index 5f318fc077e..2e057d3253c 100644 --- a/lightning-liquidity/src/lsps2/service.rs +++ b/lightning-liquidity/src/lsps2/service.rs @@ -2361,6 +2361,8 @@ mod tests { use bitcoin::{absolute::LockTime, transaction::Version}; use core::str::FromStr; + use lightning::io::Cursor; + use lightning::util::ser::{Readable, Writeable}; const MAX_VALUE_MSAT: u64 = 21_000_000_0000_0000_000; @@ -2764,6 +2766,52 @@ mod tests { } } + #[test] + fn replayed_intercepted_htlc_after_persist_is_idempotent() { + let payment_size_msat = Some(500_000_000); + let opening_fee_params = LSPS2OpeningFeeParams { + min_fee_msat: 10_000_000, + proportional: 10_000, + valid_until: LSPSDateTime::from_str("2035-05-20T08:30:45Z").unwrap(), + min_lifetime: 4032, + max_client_to_self_delay: 2016, + min_payment_size_msat: 10_000_000, + max_payment_size_msat: 1_000_000_000, + promise: "ignore".to_string(), + }; + let intercept_scid = 42; + let user_channel_id = 43; + let htlc = InterceptedHTLC { + intercept_id: InterceptId([1; 32]), + expected_outbound_amount_msat: 500_000_000, + payment_hash: PaymentHash([2; 32]), + }; + + let mut jit_channel = + OutboundJITChannel::new(payment_size_msat, opening_fee_params, user_channel_id, false); + assert!(matches!( + jit_channel.htlc_intercepted(htlc).unwrap(), + Some(HTLCInterceptedAction::OpenChannel(_)) + )); + + let mut peer_state = PeerState::new(); + peer_state.intercept_scid_by_user_channel_id.insert(user_channel_id, intercept_scid); + peer_state.insert_outbound_channel(intercept_scid, jit_channel); + + let encoded_peer_state = peer_state.encode(); + let mut decoded_peer_state = PeerState::read(&mut Cursor::new(encoded_peer_state)).unwrap(); + let decoded_jit_channel = decoded_peer_state + .outbound_channels_by_intercept_scid + .get_mut(&intercept_scid) + .unwrap(); + + assert!(decoded_jit_channel.htlc_intercepted(htlc).unwrap().is_none()); + + let ForwardPaymentAction(_, fee_payment) = + decoded_jit_channel.channel_ready(ChannelId([3; 32])).unwrap(); + assert_eq!(fee_payment.htlcs, vec![htlc]); + } + #[test] fn broadcast_not_allowed_after_non_paying_fee_payment_claimed() { let min_fee_msat: u64 = 12345; From 66da9f7ab07f248bde95076a7a665b40abd94e56 Mon Sep 17 00:00:00 2001 From: tnull Date: Tue, 2 Jun 2026 14:20:45 +0200 Subject: [PATCH 2/3] Treat replayed LSPS2 HTLCs idempotently Replayed intercepted HTLC events should not duplicate queued payments or panic after restart. Ignore already-queued intercept IDs so persisted queues remain stable across event replay. Co-Authored-By: HAL 9000 --- .../src/lsps2/payment_queue.rs | 33 ++++++++++++++----- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/lightning-liquidity/src/lsps2/payment_queue.rs b/lightning-liquidity/src/lsps2/payment_queue.rs index 421e42d7706..600f588716c 100644 --- a/lightning-liquidity/src/lsps2/payment_queue.rs +++ b/lightning-liquidity/src/lsps2/payment_queue.rs @@ -26,21 +26,29 @@ impl PaymentQueue { PaymentQueue { payments: Vec::new() } } + fn payment_status(entry: &PaymentQueueEntry) -> (u64, usize) { + let total_expected_outbound_amount_msat = + entry.htlcs.iter().map(|htlc| htlc.expected_outbound_amount_msat).sum(); + (total_expected_outbound_amount_msat, entry.htlcs.len()) + } + pub(crate) fn add_htlc(&mut self, new_htlc: InterceptedHTLC) -> (u64, usize) { + if let Some(entry) = self + .payments + .iter() + .find(|entry| entry.htlcs.iter().any(|htlc| htlc.intercept_id == new_htlc.intercept_id)) + { + debug_assert_eq!(entry.payment_hash, new_htlc.payment_hash); + return Self::payment_status(entry); + } + let payment = self.payments.iter_mut().find(|entry| entry.payment_hash == new_htlc.payment_hash); if let Some(entry) = payment { // HTLCs within a payment should have the same payment hash. debug_assert!(entry.htlcs.iter().all(|htlc| htlc.payment_hash == entry.payment_hash)); - // The given HTLC should not already be present. - debug_assert!(entry - .htlcs - .iter() - .all(|htlc| htlc.intercept_id != new_htlc.intercept_id)); entry.htlcs.push(new_htlc); - let total_expected_outbound_amount_msat = - entry.htlcs.iter().map(|htlc| htlc.expected_outbound_amount_msat).sum(); - (total_expected_outbound_amount_msat, entry.htlcs.len()) + Self::payment_status(entry) } else { let expected_outbound_amount_msat = new_htlc.expected_outbound_amount_msat; let entry = @@ -127,6 +135,15 @@ mod tests { (500_000_000, 2), ); + assert_eq!( + payment_queue.add_htlc(InterceptedHTLC { + intercept_id: InterceptId([2; 32]), + expected_outbound_amount_msat: 300_000_000, + payment_hash: PaymentHash([100; 32]), + }), + (500_000_000, 2), + ); + let expected_entry = PaymentQueueEntry { payment_hash: PaymentHash([100; 32]), htlcs: vec![ From 444c0098e4ff6460498eddbc568e3015a2adc8c0 Mon Sep 17 00:00:00 2001 From: tnull Date: Tue, 2 Jun 2026 14:21:02 +0200 Subject: [PATCH 3/3] Prune closed LSPS2 terminal channel state Terminal JIT channel state is only useful while the forwarded channel still exists. Drop completed LSPS2 mappings once the channel is gone so persisted service state does not retain stale entries indefinitely. Co-Authored-By: HAL 9000 --- lightning-liquidity/src/lsps2/service.rs | 142 +++++++++++++++++++++++ lightning-liquidity/src/manager.rs | 2 + 2 files changed, 144 insertions(+) diff --git a/lightning-liquidity/src/lsps2/service.rs b/lightning-liquidity/src/lsps2/service.rs index 2e057d3253c..e1373cfb18b 100644 --- a/lightning-liquidity/src/lsps2/service.rs +++ b/lightning-liquidity/src/lsps2/service.rs @@ -644,6 +644,26 @@ impl PeerState { }); } + fn remove_terminal_channel_state(&mut self, channel_id: ChannelId) -> Option { + let intercept_scid = self.intercept_scid_by_channel_id.get(&channel_id).copied()?; + let should_remove = self + .outbound_channels_by_intercept_scid + .get(&intercept_scid) + .and_then(|entry| entry.get_channel_id()) + .is_some_and(|existing_channel_id| existing_channel_id == channel_id); + + if !should_remove { + return None; + } + + self.outbound_channels_by_intercept_scid.remove(&intercept_scid); + self.intercept_scid_by_channel_id.remove(&channel_id); + self.intercept_scid_by_user_channel_id.retain(|_, iscid| *iscid != intercept_scid); + self.needs_persist = true; + + Some(intercept_scid) + } + fn pending_requests_and_channels(&self) -> usize { let pending_requests = self.pending_requests.len(); let pending_outbound_channels = self @@ -1252,6 +1272,43 @@ where Ok(()) } + /// Forward [`Event::ChannelClosed`] event parameter into this function. + /// + /// Will prune terminal JIT channel state once the corresponding channel has closed. + /// + /// [`Event::ChannelClosed`]: lightning::events::Event::ChannelClosed + pub async fn channel_closed(&self, channel_id: ChannelId) -> Result<(), APIError> { + let counterparty_node_id = + self.peer_by_channel_id.read().unwrap().get(&channel_id).copied(); + let Some(counterparty_node_id) = counterparty_node_id else { + return Ok(()); + }; + + let removed_intercept_scid = { + let outer_state_lock = self.per_peer_state.read().unwrap(); + match outer_state_lock.get(&counterparty_node_id) { + Some(inner_state_lock) => { + let mut peer_state = inner_state_lock.lock().unwrap(); + peer_state.remove_terminal_channel_state(channel_id) + }, + None => None, + } + }; + + if let Some(intercept_scid) = removed_intercept_scid { + self.peer_by_intercept_scid.write().unwrap().remove(&intercept_scid); + self.peer_by_channel_id.write().unwrap().remove(&channel_id); + self.persist().await.map_err(|e| APIError::APIMisuseError { + err: format!( + "Failed to persist peer state after channel {} closed: {}", + channel_id, e + ), + })?; + } + + Ok(()) + } + /// Abandons a pending JIT‐open flow for `user_channel_id`, removing all local state. /// /// This removes the intercept SCID, any outbound channel state, and associated @@ -2270,6 +2327,25 @@ where } } + /// Forward [`Event::ChannelClosed`] event parameter into this function. + /// + /// Wraps [`LSPS2ServiceHandler::channel_closed`]. + /// + /// [`Event::ChannelClosed`]: lightning::events::Event::ChannelClosed + pub fn channel_closed(&self, channel_id: ChannelId) -> Result<(), APIError> { + let mut fut = pin!(self.inner.channel_closed(channel_id)); + + let mut waker = dummy_waker(); + let mut ctx = task::Context::from_waker(&mut waker); + match fut.as_mut().poll(&mut ctx) { + task::Poll::Ready(result) => result, + task::Poll::Pending => { + // In a sync context, we can't wait for the future to complete. + unreachable!("Should not be pending in a sync context"); + }, + } + } + /// Wraps [`LSPS2ServiceHandler::channel_needs_manual_broadcast`]. pub fn channel_needs_manual_broadcast( &self, user_channel_id: u128, counterparty_node_id: &PublicKey, @@ -2812,6 +2888,72 @@ mod tests { assert_eq!(fee_payment.htlcs, vec![htlc]); } + #[test] + fn removes_terminal_state_for_closed_channel() { + let opening_fee_params = LSPS2OpeningFeeParams { + min_fee_msat: 10_000_000, + proportional: 10_000, + valid_until: LSPSDateTime::from_str("2035-05-20T08:30:45Z").unwrap(), + min_lifetime: 4032, + max_client_to_self_delay: 2016, + min_payment_size_msat: 10_000_000, + max_payment_size_msat: 1_000_000_000, + promise: "ignore".to_string(), + }; + let stale_intercept_scid = 42; + let stale_user_channel_id = 43; + let stale_channel_id = ChannelId([44; 32]); + let live_intercept_scid = 45; + let live_user_channel_id = 46; + let live_channel_id = ChannelId([47; 32]); + + let mut stale_jit_channel = + OutboundJITChannel::new(None, opening_fee_params.clone(), stale_user_channel_id, false); + stale_jit_channel.state = + OutboundJITChannelState::PaymentForwarded { channel_id: stale_channel_id }; + let mut live_jit_channel = + OutboundJITChannel::new(None, opening_fee_params, live_user_channel_id, false); + live_jit_channel.state = + OutboundJITChannelState::PaymentForwarded { channel_id: live_channel_id }; + + let mut peer_state = PeerState::new(); + peer_state.insert_outbound_channel(stale_intercept_scid, stale_jit_channel); + peer_state.insert_outbound_channel(live_intercept_scid, live_jit_channel); + peer_state + .intercept_scid_by_user_channel_id + .insert(stale_user_channel_id, stale_intercept_scid); + peer_state + .intercept_scid_by_user_channel_id + .insert(live_user_channel_id, live_intercept_scid); + peer_state.intercept_scid_by_channel_id.insert(stale_channel_id, stale_intercept_scid); + peer_state.intercept_scid_by_channel_id.insert(live_channel_id, live_intercept_scid); + peer_state.needs_persist = false; + + assert_eq!( + peer_state.remove_terminal_channel_state(stale_channel_id), + Some(stale_intercept_scid) + ); + assert!(!peer_state + .outbound_channels_by_intercept_scid + .contains_key(&stale_intercept_scid)); + assert!(peer_state.outbound_channels_by_intercept_scid.contains_key(&live_intercept_scid)); + assert!(!peer_state.intercept_scid_by_user_channel_id.contains_key(&stale_user_channel_id)); + assert_eq!( + peer_state.intercept_scid_by_user_channel_id.get(&live_user_channel_id), + Some(&live_intercept_scid) + ); + assert!(!peer_state.intercept_scid_by_channel_id.contains_key(&stale_channel_id)); + assert_eq!( + peer_state.intercept_scid_by_channel_id.get(&live_channel_id), + Some(&live_intercept_scid) + ); + assert!(peer_state.needs_persist); + + peer_state.needs_persist = false; + assert_eq!(peer_state.remove_terminal_channel_state(stale_channel_id), None); + assert!(!peer_state.needs_persist); + } + #[test] fn broadcast_not_allowed_after_non_paying_fee_payment_claimed() { let min_fee_msat: u64 = 12345; diff --git a/lightning-liquidity/src/manager.rs b/lightning-liquidity/src/manager.rs index f1b098dbfaa..9accd9e5769 100644 --- a/lightning-liquidity/src/manager.rs +++ b/lightning-liquidity/src/manager.rs @@ -256,6 +256,7 @@ where /// - [`Event::ChannelReady`] to [`LSPS2ServiceHandler::channel_ready`] /// - [`Event::HTLCHandlingFailed`] to [`LSPS2ServiceHandler::htlc_handling_failed`] /// - [`Event::PaymentForwarded`] to [`LSPS2ServiceHandler::payment_forwarded`] +/// - [`Event::ChannelClosed`] to [`LSPS2ServiceHandler::channel_closed`] /// /// [`PeerManager`]: lightning::ln::peer_handler::PeerManager /// [`MessageHandler`]: lightning::ln::peer_handler::MessageHandler @@ -263,6 +264,7 @@ where /// [`Event::ChannelReady`]: lightning::events::Event::ChannelReady /// [`Event::HTLCHandlingFailed`]: lightning::events::Event::HTLCHandlingFailed /// [`Event::PaymentForwarded`]: lightning::events::Event::PaymentForwarded +/// [`Event::ChannelClosed`]: lightning::events::Event::ChannelClosed pub struct LiquidityManager< ES: EntropySource + Clone, NS: NodeSigner + Clone,