|
7 | 7 | "errors" |
8 | 8 | "fmt" |
9 | 9 | "math" |
10 | | - "sort" |
11 | 10 | "strconv" |
12 | 11 | "time" |
13 | 12 |
|
@@ -50,12 +49,12 @@ type SQLQueries interface { |
50 | 49 | FilterPaymentsDesc(ctx context.Context, query sqlc.FilterPaymentsDescParams) ([]sqlc.FilterPaymentsDescRow, error) |
51 | 50 | FetchPayment(ctx context.Context, paymentIdentifier []byte) (sqlc.FetchPaymentRow, error) |
52 | 51 | FetchPaymentsByIDs(ctx context.Context, paymentIDs []int64) ([]sqlc.FetchPaymentsByIDsRow, error) |
| 52 | + FetchNonTerminalPayments(ctx context.Context, arg sqlc.FetchNonTerminalPaymentsParams) ([]sqlc.FetchNonTerminalPaymentsRow, error) |
53 | 53 |
|
54 | 54 | CountPayments(ctx context.Context) (int64, error) |
55 | 55 |
|
56 | 56 | FetchHtlcAttemptsForPayments(ctx context.Context, paymentIDs []int64) ([]sqlc.FetchHtlcAttemptsForPaymentsRow, error) |
57 | 57 | FetchHtlcAttemptResolutionsForPayments(ctx context.Context, paymentIDs []int64) ([]sqlc.FetchHtlcAttemptResolutionsForPaymentsRow, error) |
58 | | - FetchAllInflightAttempts(ctx context.Context, arg sqlc.FetchAllInflightAttemptsParams) ([]sqlc.PaymentHtlcAttempt, error) |
59 | 58 | FetchHopsForAttempts(ctx context.Context, htlcAttemptIndices []int64) ([]sqlc.FetchHopsForAttemptsRow, error) |
60 | 59 |
|
61 | 60 | FetchPaymentDuplicates(ctx context.Context, paymentID int64) ([]sqlc.PaymentDuplicate, error) |
@@ -182,88 +181,6 @@ func fetchPaymentWithCompleteData(ctx context.Context, |
182 | 181 | return buildPaymentFromBatchData(dbPayment, batchData, true) |
183 | 182 | } |
184 | 183 |
|
185 | | -// paymentsCompleteData holds the full payment data when batch loading base |
186 | | -// payment data and all the related data for a payment. |
187 | | -type paymentsCompleteData struct { |
188 | | - *paymentsBaseData |
189 | | - *paymentsDetailsData |
190 | | -} |
191 | | - |
192 | | -// batchLoadPayments loads the full payment data for a batch of payment IDs. |
193 | | -func batchLoadPayments(ctx context.Context, cfg *sqldb.QueryConfig, |
194 | | - db SQLQueries, paymentIDs []int64) (*paymentsCompleteData, error) { |
195 | | - |
196 | | - baseData, err := batchLoadpaymentsBaseData(ctx, cfg, db, paymentIDs) |
197 | | - if err != nil { |
198 | | - return nil, fmt.Errorf("failed to load payment base data: %w", |
199 | | - err) |
200 | | - } |
201 | | - |
202 | | - batchData, err := batchLoadPaymentDetailsData( |
203 | | - ctx, cfg, db, paymentIDs, true, |
204 | | - ) |
205 | | - if err != nil { |
206 | | - return nil, fmt.Errorf("failed to load payment batch data: %w", |
207 | | - err) |
208 | | - } |
209 | | - |
210 | | - return &paymentsCompleteData{ |
211 | | - paymentsBaseData: baseData, |
212 | | - paymentsDetailsData: batchData, |
213 | | - }, nil |
214 | | -} |
215 | | - |
216 | | -// paymentsBaseData holds the base payment and intent data for a batch of |
217 | | -// payments. |
218 | | -type paymentsBaseData struct { |
219 | | - // paymentsAndIntents maps payment ID to its payment and intent data. |
220 | | - paymentsAndIntents map[int64]sqlc.PaymentAndIntent |
221 | | -} |
222 | | - |
223 | | -// batchLoadpaymentsBaseData loads the base payment and payment intent data for |
224 | | -// a batch of payment IDs. This complements loadPaymentsBatchData which loads |
225 | | -// related data (attempts, hops, custom records) but not the payment table |
226 | | -// and payment intent table data. |
227 | | -func batchLoadpaymentsBaseData(ctx context.Context, |
228 | | - cfg *sqldb.QueryConfig, db SQLQueries, |
229 | | - paymentIDs []int64) (*paymentsBaseData, error) { |
230 | | - |
231 | | - baseData := &paymentsBaseData{ |
232 | | - paymentsAndIntents: make(map[int64]sqlc.PaymentAndIntent), |
233 | | - } |
234 | | - |
235 | | - if len(paymentIDs) == 0 { |
236 | | - return baseData, nil |
237 | | - } |
238 | | - |
239 | | - err := sqldb.ExecuteBatchQuery( |
240 | | - ctx, cfg, paymentIDs, |
241 | | - func(id int64) int64 { return id }, |
242 | | - func(ctx context.Context, ids []int64) ( |
243 | | - []sqlc.FetchPaymentsByIDsRow, error) { |
244 | | - |
245 | | - records, err := db.FetchPaymentsByIDs( |
246 | | - ctx, ids, |
247 | | - ) |
248 | | - |
249 | | - return records, err |
250 | | - }, |
251 | | - func(ctx context.Context, |
252 | | - payment sqlc.FetchPaymentsByIDsRow) error { |
253 | | - |
254 | | - baseData.paymentsAndIntents[payment.ID] = payment |
255 | | - |
256 | | - return nil |
257 | | - }, |
258 | | - ) |
259 | | - if err != nil { |
260 | | - return nil, fmt.Errorf("failed to fetch payment base "+ |
261 | | - "data: %w", err) |
262 | | - } |
263 | | - |
264 | | - return baseData, nil |
265 | | -} |
266 | | - |
267 | 184 | // paymentsRelatedData holds all the batch-loaded data for multiple payments. |
268 | 185 | // This does not include the base payment and intent data which is fetched |
269 | 186 | // separately. It includes the additional data like attempts, hops, hop custom |
@@ -1047,104 +964,64 @@ func (s *SQLStore) FetchInFlightPayments(ctx context.Context) ([]*MPPayment, |
1047 | 964 | var mpPayments []*MPPayment |
1048 | 965 |
|
1049 | 966 | err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { |
1050 | | - // Track which payment IDs we've already processed across all |
1051 | | - // pages to avoid loading the same payment multiple times when |
1052 | | - // multiple inflight attempts belong to the same payment. |
1053 | | - processedPayments := make(map[int64]*MPPayment) |
| 967 | + extractCursor := func( |
| 968 | + row sqlc.FetchNonTerminalPaymentsRow) int64 { |
1054 | 969 |
|
1055 | | - extractCursor := func(row sqlc.PaymentHtlcAttempt) int64 { |
1056 | | - return row.AttemptIndex |
| 970 | + return row.ID |
1057 | 971 | } |
1058 | 972 |
|
1059 | | - // collectFunc extracts the payment ID from each attempt row. |
1060 | | - collectFunc := func(row sqlc.PaymentHtlcAttempt) ( |
| 973 | + collectFunc := func(row sqlc.FetchNonTerminalPaymentsRow) ( |
1061 | 974 | int64, error) { |
1062 | 975 |
|
1063 | | - return row.PaymentID, nil |
| 976 | + return row.ID, nil |
1064 | 977 | } |
1065 | 978 |
|
1066 | | - // batchDataFunc loads payment data for a batch of payment IDs, |
1067 | | - // but only for IDs we haven't processed yet. |
1068 | 979 | batchDataFunc := func(ctx context.Context, |
1069 | | - paymentIDs []int64) (*paymentsCompleteData, error) { |
1070 | | - |
1071 | | - // Filter out already-processed payment IDs. |
1072 | | - uniqueIDs := make([]int64, 0, len(paymentIDs)) |
1073 | | - for _, id := range paymentIDs { |
1074 | | - _, processed := processedPayments[id] |
1075 | | - if !processed { |
1076 | | - uniqueIDs = append(uniqueIDs, id) |
1077 | | - } |
1078 | | - } |
| 980 | + paymentIDs []int64) (*paymentsDetailsData, error) { |
1079 | 981 |
|
1080 | | - // If uniqueIDs is empty, the batch load will return |
1081 | | - // empty batch data. |
1082 | | - return batchLoadPayments( |
1083 | | - ctx, s.cfg.QueryCfg, db, uniqueIDs, |
| 982 | + return batchLoadPaymentDetailsData( |
| 983 | + ctx, s.cfg.QueryCfg, db, paymentIDs, true, |
1084 | 984 | ) |
1085 | 985 | } |
1086 | 986 |
|
1087 | | - // processAttempt processes each attempt. We only build and |
1088 | | - // store the payment once per unique payment ID. |
1089 | | - processAttempt := func(ctx context.Context, |
1090 | | - row sqlc.PaymentHtlcAttempt, |
1091 | | - batchData *paymentsCompleteData) error { |
1092 | | - |
1093 | | - // Skip if we've already processed this payment. |
1094 | | - _, processed := processedPayments[row.PaymentID] |
1095 | | - if processed { |
1096 | | - return nil |
1097 | | - } |
1098 | | - |
1099 | | - dbPayment := batchData.paymentsAndIntents[row.PaymentID] |
| 987 | + processPayment := func(ctx context.Context, |
| 988 | + row sqlc.FetchNonTerminalPaymentsRow, |
| 989 | + batchData *paymentsDetailsData) error { |
1100 | 990 |
|
1101 | | - // Build the payment from batch data. |
1102 | | - mpPayment, err := buildPaymentFromBatchData( |
1103 | | - dbPayment, batchData.paymentsDetailsData, true, |
| 991 | + payment, err := buildPaymentFromBatchData( |
| 992 | + row, batchData, true, |
1104 | 993 | ) |
1105 | 994 | if err != nil { |
1106 | 995 | return fmt.Errorf("failed to build payment: %w", |
1107 | 996 | err) |
1108 | 997 | } |
1109 | 998 |
|
1110 | | - // Store in our processed map. |
1111 | | - processedPayments[row.PaymentID] = mpPayment |
| 999 | + mpPayments = append(mpPayments, payment) |
1112 | 1000 |
|
1113 | 1001 | return nil |
1114 | 1002 | } |
1115 | 1003 |
|
1116 | | - queryFunc := func(ctx context.Context, lastAttemptIndex int64, |
1117 | | - limit int32) ([]sqlc.PaymentHtlcAttempt, |
| 1004 | + queryFunc := func(ctx context.Context, lastPaymentID int64, |
| 1005 | + limit int32) ([]sqlc.FetchNonTerminalPaymentsRow, |
1118 | 1006 | error) { |
1119 | 1007 |
|
1120 | | - return db.FetchAllInflightAttempts(ctx, |
1121 | | - sqlc.FetchAllInflightAttemptsParams{ |
1122 | | - AttemptIndex: lastAttemptIndex, |
1123 | | - Limit: limit, |
| 1008 | + return db.FetchNonTerminalPayments(ctx, |
| 1009 | + sqlc.FetchNonTerminalPaymentsParams{ |
| 1010 | + ID: lastPaymentID, |
| 1011 | + Limit: limit, |
1124 | 1012 | }, |
1125 | 1013 | ) |
1126 | 1014 | } |
1127 | 1015 |
|
1128 | 1016 | err := sqldb.ExecuteCollectAndBatchWithSharedDataQuery( |
1129 | | - ctx, s.cfg.QueryCfg, int64(-1), queryFunc, |
| 1017 | + ctx, s.cfg.QueryCfg, int64(0), queryFunc, |
1130 | 1018 | extractCursor, collectFunc, batchDataFunc, |
1131 | | - processAttempt, |
| 1019 | + processPayment, |
1132 | 1020 | ) |
1133 | 1021 | if err != nil { |
1134 | 1022 | return err |
1135 | 1023 | } |
1136 | 1024 |
|
1137 | | - // Convert map to slice and sort by sequence number to |
1138 | | - // produce a deterministic ordering. |
1139 | | - mpPayments = make([]*MPPayment, 0, len(processedPayments)) |
1140 | | - for _, payment := range processedPayments { |
1141 | | - mpPayments = append(mpPayments, payment) |
1142 | | - } |
1143 | | - sort.Slice(mpPayments, func(i, j int) bool { |
1144 | | - return mpPayments[i].SequenceNum < |
1145 | | - mpPayments[j].SequenceNum |
1146 | | - }) |
1147 | | - |
1148 | 1025 | return nil |
1149 | 1026 | }, func() { |
1150 | 1027 | mpPayments = nil |
@@ -1597,13 +1474,21 @@ func (s *SQLStore) RegisterAttempt(ctx context.Context, |
1597 | 1474 | // Register the plain HTLC attempt next. |
1598 | 1475 | sessionKey := attempt.SessionKey() |
1599 | 1476 | sessionKeyBytes := sessionKey.Serialize() |
| 1477 | + attemptHash := paymentHash[:] |
| 1478 | + if attempt.Hash != nil { |
| 1479 | + attemptHash = attempt.Hash[:] |
| 1480 | + } else { |
| 1481 | + log.Errorf("RegisterAttempt: attempt %d has nil hash, "+ |
| 1482 | + "falling back to payment identifier %x", |
| 1483 | + attempt.AttemptID, paymentHash) |
| 1484 | + } |
1600 | 1485 |
|
1601 | 1486 | _, err = db.InsertHtlcAttempt(ctx, sqlc.InsertHtlcAttemptParams{ |
1602 | 1487 | PaymentID: dbPayment.Payment.ID, |
1603 | 1488 | AttemptIndex: int64(attempt.AttemptID), |
1604 | 1489 | SessionKey: sessionKeyBytes, |
1605 | 1490 | AttemptTime: attempt.AttemptTime, |
1606 | | - PaymentHash: paymentHash[:], |
| 1491 | + PaymentHash: attemptHash, |
1607 | 1492 | FirstHopAmountMsat: int64( |
1608 | 1493 | attempt.Route.FirstHopAmount.Val.Int(), |
1609 | 1494 | ), |
|
0 commit comments