Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions lightning-liquidity/src/lsps2/payment_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,29 @@ impl PaymentQueue {
PaymentQueue { payments: Vec::new() }
}

fn payment_status(entry: &PaymentQueueEntry) -> (u64, usize) {
let total_expected_outbound_amount_msat =
entry.htlcs.iter().map(|htlc| htlc.expected_outbound_amount_msat).sum();
(total_expected_outbound_amount_msat, entry.htlcs.len())
}

pub(crate) fn add_htlc(&mut self, new_htlc: InterceptedHTLC) -> (u64, usize) {
if let Some(entry) = self
.payments
.iter()
.find(|entry| entry.htlcs.iter().any(|htlc| htlc.intercept_id == new_htlc.intercept_id))
{
debug_assert_eq!(entry.payment_hash, new_htlc.payment_hash);
return Self::payment_status(entry);
}

let payment =
self.payments.iter_mut().find(|entry| entry.payment_hash == new_htlc.payment_hash);
if let Some(entry) = payment {
// HTLCs within a payment should have the same payment hash.
debug_assert!(entry.htlcs.iter().all(|htlc| htlc.payment_hash == entry.payment_hash));
// The given HTLC should not already be present.
debug_assert!(entry
.htlcs
.iter()
.all(|htlc| htlc.intercept_id != new_htlc.intercept_id));
entry.htlcs.push(new_htlc);
let total_expected_outbound_amount_msat =
entry.htlcs.iter().map(|htlc| htlc.expected_outbound_amount_msat).sum();
(total_expected_outbound_amount_msat, entry.htlcs.len())
Self::payment_status(entry)
} else {
let expected_outbound_amount_msat = new_htlc.expected_outbound_amount_msat;
let entry =
Expand Down Expand Up @@ -127,6 +135,15 @@ mod tests {
(500_000_000, 2),
);

assert_eq!(
payment_queue.add_htlc(InterceptedHTLC {
intercept_id: InterceptId([2; 32]),
expected_outbound_amount_msat: 300_000_000,
payment_hash: PaymentHash([100; 32]),
}),
(500_000_000, 2),
);

let expected_entry = PaymentQueueEntry {
payment_hash: PaymentHash([100; 32]),
htlcs: vec![
Expand Down
190 changes: 190 additions & 0 deletions lightning-liquidity/src/lsps2/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,26 @@ impl PeerState {
});
}

fn remove_terminal_channel_state(&mut self, channel_id: ChannelId) -> Option<u64> {
let intercept_scid = self.intercept_scid_by_channel_id.get(&channel_id).copied()?;
let should_remove = self
.outbound_channels_by_intercept_scid
.get(&intercept_scid)
.and_then(|entry| entry.get_channel_id())
.is_some_and(|existing_channel_id| existing_channel_id == channel_id);

if !should_remove {
return None;
}

self.outbound_channels_by_intercept_scid.remove(&intercept_scid);
self.intercept_scid_by_channel_id.remove(&channel_id);
self.intercept_scid_by_user_channel_id.retain(|_, iscid| *iscid != intercept_scid);
self.needs_persist = true;

Some(intercept_scid)
}

fn pending_requests_and_channels(&self) -> usize {
let pending_requests = self.pending_requests.len();
let pending_outbound_channels = self
Expand Down Expand Up @@ -1252,6 +1272,43 @@ where
Ok(())
}

/// Forward [`Event::ChannelClosed`] event parameter into this function.
///
/// Will prune terminal JIT channel state once the corresponding channel has closed.
///
/// [`Event::ChannelClosed`]: lightning::events::Event::ChannelClosed
pub async fn channel_closed(&self, channel_id: ChannelId) -> Result<(), APIError> {
let counterparty_node_id =
self.peer_by_channel_id.read().unwrap().get(&channel_id).copied();
let Some(counterparty_node_id) = counterparty_node_id else {
return Ok(());
};

let removed_intercept_scid = {
let outer_state_lock = self.per_peer_state.read().unwrap();
match outer_state_lock.get(&counterparty_node_id) {
Some(inner_state_lock) => {
let mut peer_state = inner_state_lock.lock().unwrap();
peer_state.remove_terminal_channel_state(channel_id)
},
None => None,
}
};

if let Some(intercept_scid) = removed_intercept_scid {
self.peer_by_intercept_scid.write().unwrap().remove(&intercept_scid);
self.peer_by_channel_id.write().unwrap().remove(&channel_id);
self.persist().await.map_err(|e| APIError::APIMisuseError {
err: format!(
"Failed to persist peer state after channel {} closed: {}",
channel_id, e
),
})?;
}

Ok(())
}

/// Abandons a pending JIT‐open flow for `user_channel_id`, removing all local state.
///
/// This removes the intercept SCID, any outbound channel state, and associated
Expand Down Expand Up @@ -2270,6 +2327,25 @@ where
}
}

/// Forward [`Event::ChannelClosed`] event parameter into this function.
///
/// Wraps [`LSPS2ServiceHandler::channel_closed`].
///
/// [`Event::ChannelClosed`]: lightning::events::Event::ChannelClosed
pub fn channel_closed(&self, channel_id: ChannelId) -> Result<(), APIError> {
let mut fut = pin!(self.inner.channel_closed(channel_id));

let mut waker = dummy_waker();
let mut ctx = task::Context::from_waker(&mut waker);
match fut.as_mut().poll(&mut ctx) {
task::Poll::Ready(result) => result,
task::Poll::Pending => {
// In a sync context, we can't wait for the future to complete.
unreachable!("Should not be pending in a sync context");
},
}
}

/// Wraps [`LSPS2ServiceHandler::channel_needs_manual_broadcast`].
pub fn channel_needs_manual_broadcast(
&self, user_channel_id: u128, counterparty_node_id: &PublicKey,
Expand Down Expand Up @@ -2361,6 +2437,8 @@ mod tests {

use bitcoin::{absolute::LockTime, transaction::Version};
use core::str::FromStr;
use lightning::io::Cursor;
use lightning::util::ser::{Readable, Writeable};

const MAX_VALUE_MSAT: u64 = 21_000_000_0000_0000_000;

Expand Down Expand Up @@ -2764,6 +2842,118 @@ mod tests {
}
}

#[test]
fn replayed_intercepted_htlc_after_persist_is_idempotent() {
let payment_size_msat = Some(500_000_000);
let opening_fee_params = LSPS2OpeningFeeParams {
min_fee_msat: 10_000_000,
proportional: 10_000,
valid_until: LSPSDateTime::from_str("2035-05-20T08:30:45Z").unwrap(),
min_lifetime: 4032,
max_client_to_self_delay: 2016,
min_payment_size_msat: 10_000_000,
max_payment_size_msat: 1_000_000_000,
promise: "ignore".to_string(),
};
let intercept_scid = 42;
let user_channel_id = 43;
let htlc = InterceptedHTLC {
intercept_id: InterceptId([1; 32]),
expected_outbound_amount_msat: 500_000_000,
payment_hash: PaymentHash([2; 32]),
};

let mut jit_channel =
OutboundJITChannel::new(payment_size_msat, opening_fee_params, user_channel_id, false);
assert!(matches!(
jit_channel.htlc_intercepted(htlc).unwrap(),
Some(HTLCInterceptedAction::OpenChannel(_))
));

let mut peer_state = PeerState::new();
peer_state.intercept_scid_by_user_channel_id.insert(user_channel_id, intercept_scid);
peer_state.insert_outbound_channel(intercept_scid, jit_channel);

let encoded_peer_state = peer_state.encode();
let mut decoded_peer_state = PeerState::read(&mut Cursor::new(encoded_peer_state)).unwrap();
let decoded_jit_channel = decoded_peer_state
.outbound_channels_by_intercept_scid
.get_mut(&intercept_scid)
.unwrap();

assert!(decoded_jit_channel.htlc_intercepted(htlc).unwrap().is_none());

let ForwardPaymentAction(_, fee_payment) =
decoded_jit_channel.channel_ready(ChannelId([3; 32])).unwrap();
assert_eq!(fee_payment.htlcs, vec![htlc]);
}

#[test]
fn removes_terminal_state_for_closed_channel() {
let opening_fee_params = LSPS2OpeningFeeParams {
min_fee_msat: 10_000_000,
proportional: 10_000,
valid_until: LSPSDateTime::from_str("2035-05-20T08:30:45Z").unwrap(),
min_lifetime: 4032,
max_client_to_self_delay: 2016,
min_payment_size_msat: 10_000_000,
max_payment_size_msat: 1_000_000_000,
promise: "ignore".to_string(),
};
let stale_intercept_scid = 42;
let stale_user_channel_id = 43;
let stale_channel_id = ChannelId([44; 32]);
let live_intercept_scid = 45;
let live_user_channel_id = 46;
let live_channel_id = ChannelId([47; 32]);

let mut stale_jit_channel =
OutboundJITChannel::new(None, opening_fee_params.clone(), stale_user_channel_id, false);
stale_jit_channel.state =
OutboundJITChannelState::PaymentForwarded { channel_id: stale_channel_id };
let mut live_jit_channel =
OutboundJITChannel::new(None, opening_fee_params, live_user_channel_id, false);
live_jit_channel.state =
OutboundJITChannelState::PaymentForwarded { channel_id: live_channel_id };

let mut peer_state = PeerState::new();
peer_state.insert_outbound_channel(stale_intercept_scid, stale_jit_channel);
peer_state.insert_outbound_channel(live_intercept_scid, live_jit_channel);
peer_state
.intercept_scid_by_user_channel_id
.insert(stale_user_channel_id, stale_intercept_scid);
peer_state
.intercept_scid_by_user_channel_id
.insert(live_user_channel_id, live_intercept_scid);
peer_state.intercept_scid_by_channel_id.insert(stale_channel_id, stale_intercept_scid);
peer_state.intercept_scid_by_channel_id.insert(live_channel_id, live_intercept_scid);
peer_state.needs_persist = false;

assert_eq!(
peer_state.remove_terminal_channel_state(stale_channel_id),
Some(stale_intercept_scid)
);
assert!(!peer_state
.outbound_channels_by_intercept_scid
.contains_key(&stale_intercept_scid));
assert!(peer_state.outbound_channels_by_intercept_scid.contains_key(&live_intercept_scid));
assert!(!peer_state.intercept_scid_by_user_channel_id.contains_key(&stale_user_channel_id));
assert_eq!(
peer_state.intercept_scid_by_user_channel_id.get(&live_user_channel_id),
Some(&live_intercept_scid)
);
assert!(!peer_state.intercept_scid_by_channel_id.contains_key(&stale_channel_id));
assert_eq!(
peer_state.intercept_scid_by_channel_id.get(&live_channel_id),
Some(&live_intercept_scid)
);
assert!(peer_state.needs_persist);

peer_state.needs_persist = false;
assert_eq!(peer_state.remove_terminal_channel_state(stale_channel_id), None);
assert!(!peer_state.needs_persist);
}

#[test]
fn broadcast_not_allowed_after_non_paying_fee_payment_claimed() {
let min_fee_msat: u64 = 12345;
Expand Down
2 changes: 2 additions & 0 deletions lightning-liquidity/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,15 @@ where
/// - [`Event::ChannelReady`] to [`LSPS2ServiceHandler::channel_ready`]
/// - [`Event::HTLCHandlingFailed`] to [`LSPS2ServiceHandler::htlc_handling_failed`]
/// - [`Event::PaymentForwarded`] to [`LSPS2ServiceHandler::payment_forwarded`]
/// - [`Event::ChannelClosed`] to [`LSPS2ServiceHandler::channel_closed`]
///
/// [`PeerManager`]: lightning::ln::peer_handler::PeerManager
/// [`MessageHandler`]: lightning::ln::peer_handler::MessageHandler
/// [`Event::HTLCIntercepted`]: lightning::events::Event::HTLCIntercepted
/// [`Event::ChannelReady`]: lightning::events::Event::ChannelReady
/// [`Event::HTLCHandlingFailed`]: lightning::events::Event::HTLCHandlingFailed
/// [`Event::PaymentForwarded`]: lightning::events::Event::PaymentForwarded
/// [`Event::ChannelClosed`]: lightning::events::Event::ChannelClosed
pub struct LiquidityManager<
ES: EntropySource + Clone,
NS: NodeSigner + Clone,
Expand Down
Loading