Fix delivery engine data sync: status updates, webhooks, suppression, and reliability#39
Conversation
|
Caution Review failedPull request was closed or merged during review 📝 WalkthroughWalkthroughAdds asynchronous delivery event emission to the SMTP engine (delivered, bounced, failed) with sent_emails lookups and optional auto-suppression; adds CircuitBreaker.ForceOpen, RedisQueue.ActiveMessagePaths, startup circuit-breaker warmup, orphan .eml cleanup, and extensive tests. Changes
Sequence DiagramsequenceDiagram
participant Engine as SMTP Delivery Engine
participant DB as Database (sent_emails)
participant Handler as Registered Event Handler
participant Webhook as Webhook Dispatcher
participant Suppress as Suppression Service
Engine->>Handler: fireEvent(DeliveryEvent{SMTPMessageID, Status, Recipients, ...})
Handler->>DB: lookup sent_emails WHERE message_id = SMTPMessageID
alt status == delivered
Handler->>Webhook: POST EventDelivered(timestamp, message_id, recipient)
else status == bounced
Handler->>Webhook: POST EventBounced(timestamp, message_id, recipient, reason, smtp_code)
Handler->>Suppress: AddFromBounce(domain_id, recipient, reason)
Suppress-->>Handler: result
else status == failed
Handler->>Webhook: POST EventFailed(timestamp, message_id, recipient, reason)
end
Note right of Handler: all webhook/suppression calls run asynchronously
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
📝 Coding Plan
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (2.11.3)Error: can't load config: unsupported version of the configuration: "" See https://golangci-lint.run/docs/product/migration-guide for migration instructions Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…tempts
The delivery engine processed emails and logged results to delivery_log,
but never updated sent_emails.status — leaving every API-sent email
permanently stuck as "queued" in the send logs UI.
- Add updateSentEmailStatus() to update sent_emails on delivery
outcomes (delivered, bounced, failed)
- Add recordDeliveryAttempt() to populate delivery_attempts table
for the email detail timeline view
- Guard against status overwrites with WHERE status IN ('queued','sending')
- Truncate externally-sourced SMTP error strings to 1024 bytes
- Reject unknown statuses to prevent CHECK constraint violations
- Guard empty message ID paths to avoid wasteful queries
Fixes #32, fixes #33
The delivery engine had no way to notify the API layer when emails were
delivered, bounced, or failed. Users could subscribe to these webhook
events but they never fired, and hard-bounced addresses kept receiving
new sends.
- Add DeliveryEvent struct and DeliveryEventHandler callback to engine
- Fire events at all three terminal delivery outcomes
- API server registers handler in NewServer to:
- Trigger webhooks for delivered/bounced/failed events
- Auto-add hard-bounced addresses to suppression list via
SuppressionService.AddFromBounce()
- Look up domain_id from sent_emails for proper webhook/suppression
scoping (skips non-API emails gracefully)
Fixes #34, fixes #35
…ers on start Two reliability fixes for the delivery engine: 1. Orphaned .eml files: when the process crashes between marking a message complete in Redis and deleting the file from disk, the file stays forever. Added a periodic cleanup in recoveryWorker that removes .eml files older than 7 days (matching retry_max_age) from the queue directory. 2. Circuit breaker state loss: all breaker state is in-memory and resets to closed on restart, causing a burst of delivery attempts to known-broken domains. Added warmupCircuitBreakers() that checks delivery_log for domains with 5+ consecutive recent failures and pre-opens their breakers. Also added ForceOpen() to CircuitBreaker with proper lastFailureTime stamping. Fixes #37, fixes #38
12f1e44 to
3741de4
Compare
Codex P1 — orphan cleanup was deleting files by age alone, which could remove .eml bodies for messages still pending retry in Redis. Now cross-references Redis via ActiveMessagePaths() and only deletes files not referenced by any pending/processing queue entry. Codex P2a — warmup query counted per-recipient delivery_log rows, so a single failed message to 5 recipients would inflate the count to 5 and incorrectly open the breaker. Now uses COUNT(DISTINCT message_id). Codex P2b — warmup only checked 'rejected' and 'bounced' statuses, missing 'deferred' (temporary failures like timeouts and 4xx). These also trip the live circuit breaker, so they should count at warmup too. CodeRabbit — fireEvent now dispatches async with panic recovery to avoid blocking delivery workers. SQLite-specific datetime() replaced with Go time parameter. Scan error on success-count query is now logged at debug level. SetEventHandler documented as must-call-before- Start(). Test updates: warmup test verifies per-recipient inflation is not counted, deferred status is included, and inflated.com stays closed. Orphan cleanup test updated for Redis cross-reference. fireEvent test updated for async dispatch.
P1 — ActiveMessagePaths() now fails closed: any Redis enumeration error returns an error instead of silently continuing, so cleanupOrphanedFiles skips cleanup entirely rather than deleting live message files from a partial read. P2a — Warmup query now uses GROUP BY message_id, attempt_number to correctly count retries of the same message as separate breaker executions, while still deduplicating per-recipient log rows. P2b — Warmup now inspects the most recent attempt sequence per domain instead of checking for "any success in the window". A success resets the consecutive failure count (matching live breaker behavior), so a domain with 1 success 40min ago + 5 failures in the last 5min correctly pre-opens, while 5 failures + 1 recent success correctly stays closed. Tests updated with cases for: retry-based counting, per-recipient dedup, success-then-failure (relapsed), failure-then-success (recovered).
- PII: replace raw email in suppression logs with message_id (CodeRabbit) - Data race: store eventHandler as atomic.Value since Start() is called before NewServer()/SetEventHandler() in main.go (CodeRabbit + Devin) - SQL ordering: add ORDER BY on outer SELECT in warmup subquery since SQL does not guarantee subquery ordering is preserved (Devin)
There was a problem hiding this comment.
🧹 Nitpick comments (2)
internal/smtp/delivery/delivery.go (2)
1066-1094: SQL query may return arbitrary status values.The inner subquery groups by
(message_id, attempt_number)and selectsstatus, butstatusis not aggregated. In standard SQL and SQLite without a defined behavior, this picks an arbitrarystatusfrom the group. Since each(message_id, attempt_number)group should have a single status across recipients, this is likely safe in practice, but it's relying on undefined behavior.Also, note that the outer
ORDER BY latest DESCduplicates the innerORDER BYand may be redundant depending on SQL engine behavior with subqueries.♻️ Suggested fix for explicit aggregation
rows, err := e.db.QueryContext(e.ctx, ` - SELECT status FROM ( - SELECT status, MAX(created_at) as latest + SELECT status FROM ( + SELECT MAX(status) as status, MAX(created_at) as latest FROM delivery_log WHERE domain = ? AND created_at > ? GROUP BY message_id, attempt_number ORDER BY latest DESC LIMIT 10 ) sub ORDER BY latest DESC `, domain, oneHourAgo)Or use
MIN(status)- the key is being explicit about which status is selected when multiple recipients exist for the same(message_id, attempt_number).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/smtp/delivery/delivery.go` around lines 1066 - 1094, The SQL selects status without aggregation in the subquery inside the method using e.db.QueryContext (the block that builds recent attempts for a domain), which can return an arbitrary status for a grouped (message_id, attempt_number); change the subquery to explicitly aggregate the status (e.g., use MIN(status) or MAX(status) in the SELECT and adjust references accordingly) so the chosen status is deterministic, and remove the redundant inner ORDER BY if desired so only the outer ORDER BY latest DESC is used; update any variable names/comments near the QueryContext call and the rows scanning loop (consecutiveFailures logic) to reflect the aggregated column.
1332-1348: Potential unbounded goroutine growth under high throughput.The async dispatch spawns a new goroutine per event without any backpressure mechanism. Under sustained high load, if the handler (which does synchronous DB queries per the context snippet) is slow, goroutines can accumulate.
Consider adding a bounded worker pool or semaphore to limit concurrent event handlers, or at minimum add metrics/logging when goroutine count is high.
♻️ Optional: Add bounded concurrency
// Add to Engine struct: // eventSem chan struct{} // initialized with capacity, e.g., 100 func (e *Engine) fireEvent(ctx context.Context, event DeliveryEvent) { h, _ := e.eventHandler.Load().(DeliveryEventHandler) if h == nil { return } // Optional: bounded concurrency // select { // case e.eventSem <- struct{}{}: // default: // e.logger.Warn("Event handler backpressure, dropping event", "message_id", event.SMTPMessageID) // return // } go func(ev DeliveryEvent) { defer func() { // <-e.eventSem // release semaphore if r := recover(); r != nil { e.logger.Error("Panic in delivery event handler", "error", fmt.Sprintf("%v", r), "message_id", ev.SMTPMessageID) } }() h(context.Background(), ev) }(event) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/smtp/delivery/delivery.go` around lines 1332 - 1348, The fireEvent method on Engine currently spawns an unbounded goroutine per DeliveryEvent via the eventHandler which can lead to goroutine buildup under high throughput; modify Engine.fireEvent to limit concurrency by introducing a bounded semaphore/channel (e.g., eventSem on Engine) with a fixed capacity, acquire a slot before launching the handler (or drop/log the event when full), and always release the slot in the deferred recovery block; keep the existing panic recovery and use e.logger to warn when backpressure causes drops so you can observe when the semaphore is saturated (references: Engine.fireEvent, Engine.eventHandler, DeliveryEventHandler, eventSem, e.logger).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@internal/smtp/delivery/delivery.go`:
- Around line 1066-1094: The SQL selects status without aggregation in the
subquery inside the method using e.db.QueryContext (the block that builds recent
attempts for a domain), which can return an arbitrary status for a grouped
(message_id, attempt_number); change the subquery to explicitly aggregate the
status (e.g., use MIN(status) or MAX(status) in the SELECT and adjust references
accordingly) so the chosen status is deterministic, and remove the redundant
inner ORDER BY if desired so only the outer ORDER BY latest DESC is used; update
any variable names/comments near the QueryContext call and the rows scanning
loop (consecutiveFailures logic) to reflect the aggregated column.
- Around line 1332-1348: The fireEvent method on Engine currently spawns an
unbounded goroutine per DeliveryEvent via the eventHandler which can lead to
goroutine buildup under high throughput; modify Engine.fireEvent to limit
concurrency by introducing a bounded semaphore/channel (e.g., eventSem on
Engine) with a fixed capacity, acquire a slot before launching the handler (or
drop/log the event when full), and always release the slot in the deferred
recovery block; keep the existing panic recovery and use e.logger to warn when
backpressure causes drops so you can observe when the semaphore is saturated
(references: Engine.fireEvent, Engine.eventHandler, DeliveryEventHandler,
eventSem, e.logger).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 87092682-0295-4e14-bacb-cd24ea0a0b38
📒 Files selected for processing (4)
internal/api/server.gointernal/queue/redis.gointernal/smtp/delivery/delivery.gointernal/smtp/delivery/delivery_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- internal/api/server.go
Bare column `status` in GROUP BY works on SQLite but fails on PostgreSQL with "must appear in GROUP BY or aggregate function". Since all rows in each (message_id, attempt_number) group share the same status, MAX() returns the correct value on all backends.
Summary
The delivery engine processed emails correctly but never communicated results back to the API layer. This caused a cascade of broken features:
sent_emails.statuswas never updated after delivery (API send logs stuck on 'queued' — delivery engine never updates sent_emails #32, Harden delivery status sync against malicious SMTP responses #33)queuedevents worked (Webhook events never fire for delivered, bounced, or failed emails #34)AddFromBounce()existed but was never called (Hard bounces don't auto-add to suppression list #35)Changes
Commit 1: Status sync (
sent_emails+delivery_attempts)updateSentEmailStatus()writes delivered/bounced/failed back tosent_emailsrecordDeliveryAttempt()populatesdelivery_attemptsfor the timeline viewCommit 2: Webhook events + auto-suppression
DeliveryEventstruct andDeliveryEventHandlercallback on the enginedelivered/bounced/failedwebhooksAddFromBounce()domain_idfromsent_emailsfor proper scoping; skips non-API emailsCommit 3: Orphan cleanup + circuit breaker warmup
cleanupOrphanedFiles()removes.emlfiles older than 7 days from queue dirrecoveryWorkeron 5-minute intervalwarmupCircuitBreakers()queriesdelivery_logon startup for domains with 5+ recent consecutive failures and pre-opens their breakersForceOpen()added toCircuitBreakerwith properlastFailureTimestampingTest plan
go build ./...andgo vet ./...passSetEventHandlerpost-construction registrationForceOpenstate transition and request rejectionSummary by CodeRabbit
New Features
Tests