-
Notifications
You must be signed in to change notification settings - Fork 2.3k
paymentsdb: restore sql payment store parity with kv #10721
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
e8c58fc
53eea4d
5d9756f
4c163ac
96cbd4a
60d6b74
95e447f
412db8a
414fcc6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,7 +7,6 @@ import ( | |
| "errors" | ||
| "fmt" | ||
| "math" | ||
| "sort" | ||
| "strconv" | ||
| "time" | ||
|
|
||
|
|
@@ -50,6 +49,7 @@ type SQLQueries interface { | |
| FilterPaymentsDesc(ctx context.Context, query sqlc.FilterPaymentsDescParams) ([]sqlc.FilterPaymentsDescRow, error) | ||
| FetchPayment(ctx context.Context, paymentIdentifier []byte) (sqlc.FetchPaymentRow, error) | ||
| FetchPaymentsByIDs(ctx context.Context, paymentIDs []int64) ([]sqlc.FetchPaymentsByIDsRow, error) | ||
| FetchNonTerminalPayments(ctx context.Context, arg sqlc.FetchNonTerminalPaymentsParams) ([]sqlc.FetchNonTerminalPaymentsRow, error) | ||
|
|
||
| CountPayments(ctx context.Context) (int64, error) | ||
|
|
||
|
|
@@ -1047,104 +1047,64 @@ func (s *SQLStore) FetchInFlightPayments(ctx context.Context) ([]*MPPayment, | |
| var mpPayments []*MPPayment | ||
|
|
||
| err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { | ||
| // Track which payment IDs we've already processed across all | ||
| // pages to avoid loading the same payment multiple times when | ||
| // multiple inflight attempts belong to the same payment. | ||
| processedPayments := make(map[int64]*MPPayment) | ||
| extractCursor := func( | ||
| row sqlc.FetchNonTerminalPaymentsRow) int64 { | ||
|
|
||
| extractCursor := func(row sqlc.PaymentHtlcAttempt) int64 { | ||
| return row.AttemptIndex | ||
| return row.ID | ||
| } | ||
|
|
||
| // collectFunc extracts the payment ID from each attempt row. | ||
| collectFunc := func(row sqlc.PaymentHtlcAttempt) ( | ||
| collectFunc := func(row sqlc.FetchNonTerminalPaymentsRow) ( | ||
| int64, error) { | ||
|
|
||
| return row.PaymentID, nil | ||
| return row.ID, nil | ||
| } | ||
|
|
||
| // batchDataFunc loads payment data for a batch of payment IDs, | ||
| // but only for IDs we haven't processed yet. | ||
| batchDataFunc := func(ctx context.Context, | ||
| paymentIDs []int64) (*paymentsCompleteData, error) { | ||
|
|
||
| // Filter out already-processed payment IDs. | ||
| uniqueIDs := make([]int64, 0, len(paymentIDs)) | ||
| for _, id := range paymentIDs { | ||
| _, processed := processedPayments[id] | ||
| if !processed { | ||
| uniqueIDs = append(uniqueIDs, id) | ||
| } | ||
| } | ||
| paymentIDs []int64) (*paymentsDetailsData, error) { | ||
|
|
||
| // If uniqueIDs is empty, the batch load will return | ||
| // empty batch data. | ||
| return batchLoadPayments( | ||
| ctx, s.cfg.QueryCfg, db, uniqueIDs, | ||
| return batchLoadPaymentDetailsData( | ||
| ctx, s.cfg.QueryCfg, db, paymentIDs, true, | ||
| ) | ||
| } | ||
|
|
||
| // processAttempt processes each attempt. We only build and | ||
| // store the payment once per unique payment ID. | ||
| processAttempt := func(ctx context.Context, | ||
| row sqlc.PaymentHtlcAttempt, | ||
| batchData *paymentsCompleteData) error { | ||
|
|
||
| // Skip if we've already processed this payment. | ||
| _, processed := processedPayments[row.PaymentID] | ||
| if processed { | ||
| return nil | ||
| } | ||
|
|
||
| dbPayment := batchData.paymentsAndIntents[row.PaymentID] | ||
| processPayment := func(ctx context.Context, | ||
| row sqlc.FetchNonTerminalPaymentsRow, | ||
| batchData *paymentsDetailsData) error { | ||
|
|
||
| // Build the payment from batch data. | ||
| mpPayment, err := buildPaymentFromBatchData( | ||
| dbPayment, batchData.paymentsDetailsData, true, | ||
| payment, err := buildPaymentFromBatchData( | ||
| row, batchData, true, | ||
| ) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to build payment: %w", | ||
| err) | ||
| } | ||
|
|
||
| // Store in our processed map. | ||
| processedPayments[row.PaymentID] = mpPayment | ||
| mpPayments = append(mpPayments, payment) | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| queryFunc := func(ctx context.Context, lastAttemptIndex int64, | ||
| limit int32) ([]sqlc.PaymentHtlcAttempt, | ||
| queryFunc := func(ctx context.Context, lastPaymentID int64, | ||
| limit int32) ([]sqlc.FetchNonTerminalPaymentsRow, | ||
| error) { | ||
|
|
||
| return db.FetchAllInflightAttempts(ctx, | ||
| sqlc.FetchAllInflightAttemptsParams{ | ||
| AttemptIndex: lastAttemptIndex, | ||
| Limit: limit, | ||
| return db.FetchNonTerminalPayments(ctx, | ||
| sqlc.FetchNonTerminalPaymentsParams{ | ||
| ID: lastPaymentID, | ||
| Limit: limit, | ||
| }, | ||
| ) | ||
| } | ||
|
|
||
| err := sqldb.ExecuteCollectAndBatchWithSharedDataQuery( | ||
| ctx, s.cfg.QueryCfg, int64(-1), queryFunc, | ||
| ctx, s.cfg.QueryCfg, int64(0), queryFunc, | ||
| extractCursor, collectFunc, batchDataFunc, | ||
| processAttempt, | ||
| processPayment, | ||
| ) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Convert map to slice and sort by sequence number to | ||
| // produce a deterministic ordering. | ||
| mpPayments = make([]*MPPayment, 0, len(processedPayments)) | ||
| for _, payment := range processedPayments { | ||
| mpPayments = append(mpPayments, payment) | ||
| } | ||
| sort.Slice(mpPayments, func(i, j int) bool { | ||
| return mpPayments[i].SequenceNum < | ||
| mpPayments[j].SequenceNum | ||
| }) | ||
|
|
||
| return nil | ||
| }, func() { | ||
| mpPayments = nil | ||
|
|
@@ -1597,13 +1557,17 @@ func (s *SQLStore) RegisterAttempt(ctx context.Context, | |
| // Register the plain HTLC attempt next. | ||
| sessionKey := attempt.SessionKey() | ||
| sessionKeyBytes := sessionKey.Serialize() | ||
| attemptHash := paymentHash[:] | ||
| if attempt.Hash != nil { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we log an error in case the hash is
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good idea. I added |
||
| attemptHash = attempt.Hash[:] | ||
| } | ||
|
|
||
| _, err = db.InsertHtlcAttempt(ctx, sqlc.InsertHtlcAttemptParams{ | ||
| PaymentID: dbPayment.Payment.ID, | ||
| AttemptIndex: int64(attempt.AttemptID), | ||
| SessionKey: sessionKeyBytes, | ||
| AttemptTime: attempt.AttemptTime, | ||
| PaymentHash: paymentHash[:], | ||
| PaymentHash: attemptHash, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Worth noting that while the stored hash was wrong for AMP payments, this did not cause observable payment failures. Settlement is driven by the preimage arriving on the wire, and
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. I left the runtime behavior unchanged and treated this as a data-correctness fix rather than a live payment breakage. I only followed up with |
||
| FirstHopAmountMsat: int64( | ||
| attempt.Route.FirstHopAmount.Val.Int(), | ||
| ), | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We no longer need this sorting step?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea we no longer need the old Go-side sorting step. The current path appends one payment per selector row, and the selector query already returns rows ordered by
p.id ASC