Skip to content

Fix delivery engine data sync: status updates, webhooks, suppression, and reliability#39

Merged
fenilsonani merged 7 commits intomainfrom
fix/delivery-status-sync
Mar 16, 2026
Merged

Fix delivery engine data sync: status updates, webhooks, suppression, and reliability#39
fenilsonani merged 7 commits intomainfrom
fix/delivery-status-sync

Conversation

@fenilsonani
Copy link
Copy Markdown
Owner

@fenilsonani fenilsonani commented Mar 16, 2026

Summary

The delivery engine processed emails correctly but never communicated results back to the API layer. This caused a cascade of broken features:

Changes

Commit 1: Status sync (sent_emails + delivery_attempts)

  • updateSentEmailStatus() writes delivered/bounced/failed back to sent_emails
  • recordDeliveryAttempt() populates delivery_attempts for the timeline view
  • Truncates externally-sourced SMTP error strings to 1024 bytes
  • Guards against empty message IDs and unknown status values

Commit 2: Webhook events + auto-suppression

  • DeliveryEvent struct and DeliveryEventHandler callback on the engine
  • API server registers handler to fire delivered/bounced/failed webhooks
  • Bounced addresses auto-added to suppression list via AddFromBounce()
  • Looks up domain_id from sent_emails for proper scoping; skips non-API emails

Commit 3: Orphan cleanup + circuit breaker warmup

  • cleanupOrphanedFiles() removes .eml files older than 7 days from queue dir
  • Runs alongside existing recoveryWorker on 5-minute interval
  • warmupCircuitBreakers() queries delivery_log on startup for domains with 5+ recent consecutive failures and pre-opens their breakers
  • ForceOpen() added to CircuitBreaker with proper lastFailureTime stamping

Test plan

  • go build ./... and go vet ./... pass
  • All existing tests pass (no regressions)
  • 16 new tests added covering:
    • Status updates (delivered, bounced, failed, no-double-update, nil DB, empty ID, no match)
    • Delivery attempts recording (deferred + delivered timeline)
    • Truncation of long bounce reasons
    • Event handler firing and nil handler safety
    • SetEventHandler post-construction registration
    • Orphaned file cleanup (old vs recent vs non-eml)
    • Circuit breaker warmup (failing domain vs recovered domain vs nil DB)
    • ForceOpen state transition and request rejection
  • Deploy to staging and verify send logs show delivered/bounced status
  • Verify webhook events fire for delivery outcomes
  • Confirm bounced addresses appear in suppression list automatically

Open with Devin

Summary by CodeRabbit

  • New Features

    • Delivery event webhooks for delivered, bounced, and failed messages (async).
    • Runtime registration/update of delivery event handlers and engine event emission.
    • Enumeration of active message paths to aid cleanup.
    • Circuit breaker can be programmatically forced open; startup warmup for resilience.
    • Orphaned message file cleanup and delivery-attempt recording.
  • Tests

    • Expanded tests covering delivery events, circuit breaker ForceOpen, cleanup, and status transitions.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Mar 16, 2026

Caution

Review failed

Pull request was closed or merged during review

📝 Walkthrough

Walkthrough

Adds asynchronous delivery event emission to the SMTP engine (delivered, bounced, failed) with sent_emails lookups and optional auto-suppression; adds CircuitBreaker.ForceOpen, RedisQueue.ActiveMessagePaths, startup circuit-breaker warmup, orphan .eml cleanup, and extensive tests.

Changes

Cohort / File(s) Summary
Delivery event integration
internal/api/server.go, internal/smtp/delivery/delivery.go, internal/smtp/delivery/delivery_test.go
Introduce DeliveryEvent and DeliveryEventHandler; store handler in Engine; add WithEventHandler and SetEventHandler; emit async events on delivered/bounced/failed, update sent_emails, attempt auto-suppression for bounces, and add comprehensive delivery tests.
Circuit breaker
internal/resilience/circuitbreaker.go, internal/resilience/circuitbreaker_test.go
Add public ForceOpen() to set last-failure time and transition breaker to Open; add TestForceOpen ensuring Execute is rejected when open.
Queue & cleanup helpers
internal/queue/redis.go, internal/smtp/delivery/delivery.go
Add RedisQueue.ActiveMessagePaths(ctx) to enumerate active message file paths; add cleanupOrphanedFiles, warmupCircuitBreakers, and supporting helpers (updateSentEmailStatus, recordDeliveryAttempt, fireEvent, truncateString) used by delivery engine.
Tests & coverage
internal/smtp/delivery/delivery_test.go, internal/resilience/circuitbreaker_test.go
Large suite of tests for event firing, handler safety, status transitions (delivered/bounced/failed), cleanup edge cases, circuit-breaker warmup, truncation, and DB/queue boundary conditions.

Sequence Diagram

sequenceDiagram
    participant Engine as SMTP Delivery Engine
    participant DB as Database (sent_emails)
    participant Handler as Registered Event Handler
    participant Webhook as Webhook Dispatcher
    participant Suppress as Suppression Service

    Engine->>Handler: fireEvent(DeliveryEvent{SMTPMessageID, Status, Recipients, ...})
    Handler->>DB: lookup sent_emails WHERE message_id = SMTPMessageID
    alt status == delivered
        Handler->>Webhook: POST EventDelivered(timestamp, message_id, recipient)
    else status == bounced
        Handler->>Webhook: POST EventBounced(timestamp, message_id, recipient, reason, smtp_code)
        Handler->>Suppress: AddFromBounce(domain_id, recipient, reason)
        Suppress-->>Handler: result
    else status == failed
        Handler->>Webhook: POST EventFailed(timestamp, message_id, recipient, reason)
    end
    Note right of Handler: all webhook/suppression calls run asynchronously
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

🐇 I hop through queues and dotted logs,
I nudge broken breakers, chase stray .eml frogs,
I whisper bounces, failures, and delight,
I fire webhooks softly through the night,
A tiny rabbit cheering delivery’s flight.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 24.14% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately and specifically summarizes the main changes: delivery engine data sync improvements including status updates, webhooks, suppression, and reliability enhancements.
Description check ✅ Passed The description follows the required template with complete Summary, Changes, Related Issues, and Testing sections; includes specific issue references, detailed commit breakdown, comprehensive test coverage, and checklist items.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/delivery-status-sync
📝 Coding Plan
  • Generate coding plan for human review comments

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 golangci-lint (2.11.3)

Error: can't load config: unsupported version of the configuration: "" See https://golangci-lint.run/docs/product/migration-guide for migration instructions
The command is terminated due to an error: can't load config: unsupported version of the configuration: "" See https://golangci-lint.run/docs/product/migration-guide for migration instructions


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

…tempts

The delivery engine processed emails and logged results to delivery_log,
but never updated sent_emails.status — leaving every API-sent email
permanently stuck as "queued" in the send logs UI.

- Add updateSentEmailStatus() to update sent_emails on delivery
  outcomes (delivered, bounced, failed)
- Add recordDeliveryAttempt() to populate delivery_attempts table
  for the email detail timeline view
- Guard against status overwrites with WHERE status IN ('queued','sending')
- Truncate externally-sourced SMTP error strings to 1024 bytes
- Reject unknown statuses to prevent CHECK constraint violations
- Guard empty message ID paths to avoid wasteful queries

Fixes #32, fixes #33
The delivery engine had no way to notify the API layer when emails were
delivered, bounced, or failed. Users could subscribe to these webhook
events but they never fired, and hard-bounced addresses kept receiving
new sends.

- Add DeliveryEvent struct and DeliveryEventHandler callback to engine
- Fire events at all three terminal delivery outcomes
- API server registers handler in NewServer to:
  - Trigger webhooks for delivered/bounced/failed events
  - Auto-add hard-bounced addresses to suppression list via
    SuppressionService.AddFromBounce()
- Look up domain_id from sent_emails for proper webhook/suppression
  scoping (skips non-API emails gracefully)

Fixes #34, fixes #35
…ers on start

Two reliability fixes for the delivery engine:

1. Orphaned .eml files: when the process crashes between marking a
   message complete in Redis and deleting the file from disk, the file
   stays forever. Added a periodic cleanup in recoveryWorker that
   removes .eml files older than 7 days (matching retry_max_age) from
   the queue directory.

2. Circuit breaker state loss: all breaker state is in-memory and
   resets to closed on restart, causing a burst of delivery attempts
   to known-broken domains. Added warmupCircuitBreakers() that checks
   delivery_log for domains with 5+ consecutive recent failures and
   pre-opens their breakers. Also added ForceOpen() to CircuitBreaker
   with proper lastFailureTime stamping.

Fixes #37, fixes #38
@fenilsonani fenilsonani force-pushed the fix/delivery-status-sync branch from 12f1e44 to 3741de4 Compare March 16, 2026 05:11
coderabbitai[bot]

This comment was marked as resolved.

devin-ai-integration[bot]

This comment was marked as resolved.

Codex P1 — orphan cleanup was deleting files by age alone, which could
remove .eml bodies for messages still pending retry in Redis. Now
cross-references Redis via ActiveMessagePaths() and only deletes files
not referenced by any pending/processing queue entry.

Codex P2a — warmup query counted per-recipient delivery_log rows, so a
single failed message to 5 recipients would inflate the count to 5 and
incorrectly open the breaker. Now uses COUNT(DISTINCT message_id).

Codex P2b — warmup only checked 'rejected' and 'bounced' statuses,
missing 'deferred' (temporary failures like timeouts and 4xx). These
also trip the live circuit breaker, so they should count at warmup too.

CodeRabbit — fireEvent now dispatches async with panic recovery to
avoid blocking delivery workers. SQLite-specific datetime() replaced
with Go time parameter. Scan error on success-count query is now
logged at debug level. SetEventHandler documented as must-call-before-
Start().

Test updates: warmup test verifies per-recipient inflation is not
counted, deferred status is included, and inflated.com stays closed.
Orphan cleanup test updated for Redis cross-reference. fireEvent test
updated for async dispatch.
devin-ai-integration[bot]

This comment was marked as resolved.

P1 — ActiveMessagePaths() now fails closed: any Redis enumeration error
returns an error instead of silently continuing, so cleanupOrphanedFiles
skips cleanup entirely rather than deleting live message files from a
partial read.

P2a — Warmup query now uses GROUP BY message_id, attempt_number to
correctly count retries of the same message as separate breaker
executions, while still deduplicating per-recipient log rows.

P2b — Warmup now inspects the most recent attempt sequence per domain
instead of checking for "any success in the window". A success resets
the consecutive failure count (matching live breaker behavior), so a
domain with 1 success 40min ago + 5 failures in the last 5min correctly
pre-opens, while 5 failures + 1 recent success correctly stays closed.

Tests updated with cases for: retry-based counting, per-recipient
dedup, success-then-failure (relapsed), failure-then-success (recovered).
devin-ai-integration[bot]

This comment was marked as resolved.

- PII: replace raw email in suppression logs with message_id (CodeRabbit)
- Data race: store eventHandler as atomic.Value since Start() is called
  before NewServer()/SetEventHandler() in main.go (CodeRabbit + Devin)
- SQL ordering: add ORDER BY on outer SELECT in warmup subquery since
  SQL does not guarantee subquery ordering is preserved (Devin)
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
internal/smtp/delivery/delivery.go (2)

1066-1094: SQL query may return arbitrary status values.

The inner subquery groups by (message_id, attempt_number) and selects status, but status is not aggregated. In standard SQL and SQLite without a defined behavior, this picks an arbitrary status from the group. Since each (message_id, attempt_number) group should have a single status across recipients, this is likely safe in practice, but it's relying on undefined behavior.

Also, note that the outer ORDER BY latest DESC duplicates the inner ORDER BY and may be redundant depending on SQL engine behavior with subqueries.

♻️ Suggested fix for explicit aggregation
 		rows, err := e.db.QueryContext(e.ctx, `
-			SELECT status FROM (
-				SELECT status, MAX(created_at) as latest
+			SELECT status FROM (
+				SELECT MAX(status) as status, MAX(created_at) as latest
 				FROM delivery_log
 				WHERE domain = ? AND created_at > ?
 				GROUP BY message_id, attempt_number
 				ORDER BY latest DESC
 				LIMIT 10
 			) sub ORDER BY latest DESC
 		`, domain, oneHourAgo)

Or use MIN(status) - the key is being explicit about which status is selected when multiple recipients exist for the same (message_id, attempt_number).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/smtp/delivery/delivery.go` around lines 1066 - 1094, The SQL selects
status without aggregation in the subquery inside the method using
e.db.QueryContext (the block that builds recent attempts for a domain), which
can return an arbitrary status for a grouped (message_id, attempt_number);
change the subquery to explicitly aggregate the status (e.g., use MIN(status) or
MAX(status) in the SELECT and adjust references accordingly) so the chosen
status is deterministic, and remove the redundant inner ORDER BY if desired so
only the outer ORDER BY latest DESC is used; update any variable names/comments
near the QueryContext call and the rows scanning loop (consecutiveFailures
logic) to reflect the aggregated column.

1332-1348: Potential unbounded goroutine growth under high throughput.

The async dispatch spawns a new goroutine per event without any backpressure mechanism. Under sustained high load, if the handler (which does synchronous DB queries per the context snippet) is slow, goroutines can accumulate.

Consider adding a bounded worker pool or semaphore to limit concurrent event handlers, or at minimum add metrics/logging when goroutine count is high.

♻️ Optional: Add bounded concurrency
// Add to Engine struct:
// eventSem chan struct{} // initialized with capacity, e.g., 100

func (e *Engine) fireEvent(ctx context.Context, event DeliveryEvent) {
	h, _ := e.eventHandler.Load().(DeliveryEventHandler)
	if h == nil {
		return
	}
	
	// Optional: bounded concurrency
	// select {
	// case e.eventSem <- struct{}{}:
	// default:
	//     e.logger.Warn("Event handler backpressure, dropping event", "message_id", event.SMTPMessageID)
	//     return
	// }
	
	go func(ev DeliveryEvent) {
		defer func() {
			// <-e.eventSem // release semaphore
			if r := recover(); r != nil {
				e.logger.Error("Panic in delivery event handler",
					"error", fmt.Sprintf("%v", r),
					"message_id", ev.SMTPMessageID)
			}
		}()
		h(context.Background(), ev)
	}(event)
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/smtp/delivery/delivery.go` around lines 1332 - 1348, The fireEvent
method on Engine currently spawns an unbounded goroutine per DeliveryEvent via
the eventHandler which can lead to goroutine buildup under high throughput;
modify Engine.fireEvent to limit concurrency by introducing a bounded
semaphore/channel (e.g., eventSem on Engine) with a fixed capacity, acquire a
slot before launching the handler (or drop/log the event when full), and always
release the slot in the deferred recovery block; keep the existing panic
recovery and use e.logger to warn when backpressure causes drops so you can
observe when the semaphore is saturated (references: Engine.fireEvent,
Engine.eventHandler, DeliveryEventHandler, eventSem, e.logger).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@internal/smtp/delivery/delivery.go`:
- Around line 1066-1094: The SQL selects status without aggregation in the
subquery inside the method using e.db.QueryContext (the block that builds recent
attempts for a domain), which can return an arbitrary status for a grouped
(message_id, attempt_number); change the subquery to explicitly aggregate the
status (e.g., use MIN(status) or MAX(status) in the SELECT and adjust references
accordingly) so the chosen status is deterministic, and remove the redundant
inner ORDER BY if desired so only the outer ORDER BY latest DESC is used; update
any variable names/comments near the QueryContext call and the rows scanning
loop (consecutiveFailures logic) to reflect the aggregated column.
- Around line 1332-1348: The fireEvent method on Engine currently spawns an
unbounded goroutine per DeliveryEvent via the eventHandler which can lead to
goroutine buildup under high throughput; modify Engine.fireEvent to limit
concurrency by introducing a bounded semaphore/channel (e.g., eventSem on
Engine) with a fixed capacity, acquire a slot before launching the handler (or
drop/log the event when full), and always release the slot in the deferred
recovery block; keep the existing panic recovery and use e.logger to warn when
backpressure causes drops so you can observe when the semaphore is saturated
(references: Engine.fireEvent, Engine.eventHandler, DeliveryEventHandler,
eventSem, e.logger).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 87092682-0295-4e14-bacb-cd24ea0a0b38

📥 Commits

Reviewing files that changed from the base of the PR and between 3741de4 and 87374fa.

📒 Files selected for processing (4)
  • internal/api/server.go
  • internal/queue/redis.go
  • internal/smtp/delivery/delivery.go
  • internal/smtp/delivery/delivery_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal/api/server.go

devin-ai-integration[bot]

This comment was marked as resolved.

Bare column `status` in GROUP BY works on SQLite but fails on PostgreSQL
with "must appear in GROUP BY or aggregate function". Since all rows in
each (message_id, attempt_number) group share the same status, MAX()
returns the correct value on all backends.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant