Skip to content
72 changes: 72 additions & 0 deletions internal/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ func NewServer(
// Initialize scheduler for scheduled emails
s.scheduler = NewScheduler(db, s, 30*time.Second)

// Register delivery event handler for webhooks and auto-suppression
if deliveryEngine != nil {
s.registerDeliveryEventHandler(deliveryEngine)
}

return s, nil
}

Expand Down Expand Up @@ -228,6 +233,73 @@ func (s *Server) canSendFromDomain(ctx context.Context, email string) (bool, err
return emailDomain == domainName, nil
}

// registerDeliveryEventHandler hooks the API server into delivery engine events
// so that webhooks fire and bounced addresses get auto-suppressed.
func (s *Server) registerDeliveryEventHandler(engine *delivery.Engine) {
engine.SetEventHandler(func(ctx context.Context, event delivery.DeliveryEvent) {
if event.SMTPMessageID == "" {
return
}

fullMessageID := "<" + event.SMTPMessageID + ">"

// Look up the domain_id and recipient for this message
var domainID int64
var recipient string
err := s.db.QueryRowContext(ctx,
`SELECT domain_id, to_email FROM sent_emails WHERE message_id = ? LIMIT 1`,
fullMessageID,
).Scan(&domainID, &recipient)
if err != nil {
// Not an API-sent email (e.g., relayed SMTP) — skip
return
}

switch event.Status {
case "delivered":
go s.triggerWebhook(ctx, domainID, EventDelivered, &WebhookEvent{
Event: EventDelivered,
Timestamp: time.Now(),
MessageID: fullMessageID,
Recipient: recipient,
})

case "bounced":
go s.triggerWebhook(ctx, domainID, EventBounced, &WebhookEvent{
Event: EventBounced,
Timestamp: time.Now(),
MessageID: fullMessageID,
Recipient: recipient,
Data: map[string]interface{}{
"reason": event.ErrorMessage,
"smtp_code": event.SMTPCode,
},
})

// Auto-suppress hard-bounced addresses
if s.suppression != nil {
if suppressErr := s.suppression.AddFromBounce(ctx, domainID, recipient); suppressErr != nil {
s.logger.Warn("Failed to auto-suppress bounced address",
"message_id", fullMessageID, "error", suppressErr.Error())
} else {
s.logger.Info("Auto-suppressed bounced address", "message_id", fullMessageID)
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

case "failed":
go s.triggerWebhook(ctx, domainID, EventFailed, &WebhookEvent{
Event: EventFailed,
Timestamp: time.Now(),
MessageID: fullMessageID,
Recipient: recipient,
Data: map[string]interface{}{
"reason": event.ErrorMessage,
},
})
}
})
}

// splitEmail splits an email into local part and domain
func splitEmail(email string) []string {
for i := len(email) - 1; i >= 0; i-- {
Expand Down
54 changes: 54 additions & 0 deletions internal/queue/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,60 @@ func (q *RedisQueue) ListFailed(ctx context.Context, limit int64) ([]*Message, e
return messages, nil
}

// ActiveMessagePaths returns the set of MessagePath values for all messages
// currently in pending or processing state. Used by the orphan file cleanup
// to avoid deleting files still referenced by active queue entries.
//
// This method fails closed: if any Redis operation fails, it returns an error
// so the caller can skip cleanup rather than risk deleting live message files.
func (q *RedisQueue) ActiveMessagePaths(ctx context.Context) (map[string]bool, error) {
if err := q.validateContext(ctx); err != nil {
return nil, err
}

paths := make(map[string]bool)

// Collect message IDs from all pending priority queues
var msgIDs []string
for _, key := range q.allPendingKeys() {
members, err := q.client.ZRange(ctx, key, 0, -1).Result()
if err != nil {
return nil, fmt.Errorf("failed to enumerate pending queue %s: %w", key, err)
}
msgIDs = append(msgIDs, members...)
}

// Legacy pending queue
legacyMembers, err := q.client.ZRange(ctx, q.pendingKey(), 0, -1).Result()
if err != nil {
return nil, fmt.Errorf("failed to enumerate legacy pending queue: %w", err)
}
msgIDs = append(msgIDs, legacyMembers...)

// Processing set
procMembers, err := q.client.SMembers(ctx, q.processingKey()).Result()
if err != nil {
return nil, fmt.Errorf("failed to enumerate processing set: %w", err)
}
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
msgIDs = append(msgIDs, procMembers...)

// Resolve each message ID to its MessagePath.
// GetMessage failures for individual messages are non-fatal (the message
// data may have expired from Redis while the queue entry remains), but we
// still include all successfully resolved paths.
for _, id := range msgIDs {
msg, err := q.GetMessage(ctx, id)
if err != nil {
continue
}
if msg.MessagePath != "" {
paths[msg.MessagePath] = true
}
}

return paths, nil
}

// ListSent returns recently sent messages up to limit.
func (q *RedisQueue) ListSent(ctx context.Context, limit int64) ([]*Message, error) {
if err := q.validateContext(ctx); err != nil {
Expand Down
8 changes: 8 additions & 0 deletions internal/resilience/circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,14 @@ func (cb *CircuitBreaker) Reset() {
cb.transitionTo(StateClosed)
}

// ForceOpen forces the circuit breaker into open state.
// Used to pre-open breakers for domains known to be failing (e.g., on startup
// from delivery_log history) to avoid hammering broken servers.
func (cb *CircuitBreaker) ForceOpen() {
atomic.StoreInt64(&cb.lastFailureTime, time.Now().UnixNano())
cb.transitionTo(StateOpen)
}

// Validate checks if the circuit breaker configuration is valid.
func (cfg Config) Validate() error {
if cfg.Name == "" {
Expand Down
27 changes: 27 additions & 0 deletions internal/resilience/circuitbreaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1237,3 +1237,30 @@ func TestCircuitBreaker_CompleteLifecycle(t *testing.T) {

t.Log("Complete lifecycle test passed")
}

func TestForceOpen(t *testing.T) {
cb := NewCircuitBreaker(Config{
Name: "test-force-open",
FailureThreshold: 5,
SuccessThreshold: 2,
Timeout: 10 * time.Second,
})

if cb.State() != StateClosed {
t.Fatalf("initial state should be closed, got %v", cb.State())
}

cb.ForceOpen()

if cb.State() != StateOpen {
t.Errorf("state after ForceOpen should be open, got %v", cb.State())
}

// Requests should be rejected
err := cb.Execute(context.Background(), func(ctx context.Context) error {
return nil
})
if !errors.Is(err, ErrCircuitOpen) {
t.Errorf("expected ErrCircuitOpen after ForceOpen, got %v", err)
}
}
Loading
Loading