Skip to content

Commit 045cf38

Browse files
fix(pubsub/rabbitmq): use unique consumer tags to prevent subscription disruption on restart (#4326)
Signed-off-by: Javier Aliaga <javier@diagrid.io> Co-authored-by: Mike Nguyen <hey@mike.ee>
1 parent 5a936f7 commit 045cf38

File tree

3 files changed

+243
-4
lines changed

3 files changed

+243
-4
lines changed

pubsub/rabbitmq/rabbitmq.go

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626
"sync/atomic"
2727
"time"
2828

29+
"github.com/google/uuid"
30+
2931
amqp "github.com/rabbitmq/amqp091-go"
3032

3133
common "github.com/dapr/components-contrib/common/component/rabbitmq"
@@ -91,6 +93,7 @@ type rabbitMQChannelBroker interface {
9193
QueueDeclare(name string, durable bool, autoDelete bool, exclusive bool, noWait bool, args amqp.Table) (amqp.Queue, error)
9294
QueueBind(name string, key string, exchange string, noWait bool, args amqp.Table) error
9395
Consume(queue string, consumer string, autoAck bool, exclusive bool, noLocal bool, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)
96+
Cancel(consumer string, noWait bool) error
9497
Nack(tag uint64, multiple bool, requeue bool) error
9598
Ack(tag uint64, multiple bool) error
9699
ExchangeDeclare(name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table) error
@@ -528,9 +531,19 @@ func (r *rabbitMQ) subscribeForever(ctx context.Context, req pubsub.SubscribeReq
528531
break
529532
}
530533

534+
// Generate a unique consumer tag to avoid "attempt to reuse consumer tag"
535+
// errors when re-subscribing, which would cause a connection-level exception
536+
// and disrupt all other subscriptions sharing this channel.
537+
// AMQP 0-9-1 limits consumer tags to 255 bytes. Truncate from the
538+
// left so the UUID suffix (which guarantees uniqueness) is preserved.
539+
consumerTag := queueName + "-" + uuid.NewString()
540+
const maxConsumerTagLen = 255
541+
if len(consumerTag) > maxConsumerTagLen {
542+
consumerTag = consumerTag[len(consumerTag)-maxConsumerTagLen:]
543+
}
531544
msgs, err = channel.Consume(
532545
q.Name,
533-
queueName, // consumerID
546+
consumerTag, // unique consumerID per subscription attempt
534547
r.metadata.AutoAck, // autoAck
535548
false, // exclusive
536549
false, // noLocal
@@ -542,25 +555,35 @@ func (r *rabbitMQ) subscribeForever(ctx context.Context, req pubsub.SubscribeReq
542555
break
543556
}
544557

558+
r.logger.Debugf("%s registered consumer %s for queue %s", logMessagePrefix, consumerTag, q.Name)
559+
545560
// one-time notification on successful subscribe
546561
if ackCh != nil {
547562
ackCh <- false
548563
ackCh = nil
549564
}
550565

551566
err = r.listenMessages(ctx, channel, msgs, req.Topic, handler)
567+
// Always cancel the consumer server-side so RabbitMQ can
568+
// release the registration. noWait=true avoids blocking if
569+
// the channel is already closing.
570+
if cancelErr := channel.Cancel(consumerTag, true); cancelErr != nil {
571+
r.logger.Debugf("%s failed to cancel consumer %s: %v", logMessagePrefix, consumerTag, cancelErr)
572+
}
552573
if err != nil {
553574
errFuncName = "listenMessages"
554575
break
555576
}
556577
}
557578

558-
if strings.Contains(err.Error(), errorInvalidQueueType) {
559-
ackCh <- true
579+
if err != nil && strings.Contains(err.Error(), errorInvalidQueueType) {
580+
if ackCh != nil {
581+
ackCh <- true
582+
}
560583
return
561584
}
562585

563-
if err == context.Canceled || err == context.DeadlineExceeded {
586+
if err != nil && (errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
564587
// Subscription context was canceled
565588
r.logger.Infof("%s subscription for %s has context canceled", logMessagePrefix, queueName)
566589
return
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
//go:build integration_test
2+
// +build integration_test
3+
4+
/*
5+
Copyright 2026 The Dapr Authors
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package rabbitmq
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"sync"
23+
"sync/atomic"
24+
"testing"
25+
"time"
26+
27+
amqp "github.com/rabbitmq/amqp091-go"
28+
"github.com/stretchr/testify/assert"
29+
"github.com/stretchr/testify/require"
30+
31+
mdata "github.com/dapr/components-contrib/metadata"
32+
"github.com/dapr/components-contrib/pubsub"
33+
"github.com/dapr/kit/logger"
34+
)
35+
36+
const (
37+
testRabbitMQURL = "amqp://test:test@localhost:5672"
38+
)
39+
40+
// TestSubscriptionRestart verifies that restarting one subscription does not
41+
// disrupt other active subscriptions sharing the same connection/channel.
42+
//
43+
// Regression test for https://github.com/dapr/java-sdk/issues/1701 where
44+
// reusing consumer tags on the shared channel caused a connection-level
45+
// "attempt to reuse consumer tag" exception that killed all subscriptions.
46+
func TestSubscriptionRestart(t *testing.T) {
47+
// Verify RabbitMQ is reachable
48+
conn, err := amqp.Dial(testRabbitMQURL)
49+
require.NoError(t, err, "RabbitMQ must be running at %s", testRabbitMQURL)
50+
conn.Close()
51+
52+
log := logger.NewLogger("test")
53+
54+
r := NewRabbitMQ(log).(*rabbitMQ)
55+
err = r.Init(t.Context(), pubsub.Metadata{Base: mdata.Base{
56+
Properties: map[string]string{
57+
metadataConnectionStringKey: testRabbitMQURL,
58+
metadataConsumerIDKey: "integration-test",
59+
metadataDurableKey: "true",
60+
metadataDeleteWhenUnusedKey: "false",
61+
metadataRequeueInFailureKey: "true",
62+
},
63+
}})
64+
require.NoError(t, err)
65+
defer r.Close()
66+
67+
topicStable := "stable-topic"
68+
topicRestart := "restart-topic"
69+
70+
var stableCount atomic.Int32
71+
var restartCount atomic.Int32
72+
73+
stableHandler := func(_ context.Context, msg *pubsub.NewMessage) error {
74+
stableCount.Add(1)
75+
return nil
76+
}
77+
restartHandler := func(_ context.Context, msg *pubsub.NewMessage) error {
78+
restartCount.Add(1)
79+
return nil
80+
}
81+
82+
// Subscribe to both topics
83+
ctx, cancel := context.WithCancel(t.Context())
84+
defer cancel()
85+
86+
err = r.Subscribe(ctx, pubsub.SubscribeRequest{Topic: topicStable}, stableHandler)
87+
require.NoError(t, err)
88+
89+
// Use a separate context for the restart topic so we can cancel it independently
90+
restartCtx, restartCancel := context.WithCancel(t.Context())
91+
92+
err = r.Subscribe(restartCtx, pubsub.SubscribeRequest{Topic: topicRestart}, restartHandler)
93+
require.NoError(t, err)
94+
95+
// Phase 1: Verify both subscriptions receive messages
96+
publishN(t, r, topicStable, 5)
97+
publishN(t, r, topicRestart, 5)
98+
99+
assert.Eventually(t, func() bool {
100+
return stableCount.Load() >= 5 && restartCount.Load() >= 5
101+
}, 10*time.Second, 100*time.Millisecond, "both subscriptions should receive messages")
102+
103+
t.Logf("Phase 1 passed: stable=%d, restart=%d", stableCount.Load(), restartCount.Load())
104+
105+
// Phase 2: Cancel the restart subscription (simulates stopping a streaming subscription)
106+
restartCancel()
107+
time.Sleep(2 * time.Second)
108+
109+
// Phase 3: Re-subscribe to the restart topic.
110+
// Before the fix, this would reuse the same consumer tag and cause
111+
// RabbitMQ to throw a connection-level "attempt to reuse consumer tag" error,
112+
// killing the stable subscription too.
113+
restartCount.Store(0)
114+
stableCount.Store(0)
115+
116+
restartCtx2, restartCancel2 := context.WithCancel(t.Context())
117+
defer restartCancel2()
118+
119+
err = r.Subscribe(restartCtx2, pubsub.SubscribeRequest{Topic: topicRestart}, restartHandler)
120+
require.NoError(t, err, "re-subscribe should succeed without connection errors")
121+
122+
// Phase 4: Verify the stable subscription was NOT disrupted
123+
publishN(t, r, topicStable, 5)
124+
publishN(t, r, topicRestart, 5)
125+
126+
assert.Eventually(t, func() bool {
127+
return stableCount.Load() >= 5
128+
}, 10*time.Second, 100*time.Millisecond,
129+
"stable subscription must still work after restart (got %d messages)", stableCount.Load())
130+
131+
assert.Eventually(t, func() bool {
132+
return restartCount.Load() >= 5
133+
}, 10*time.Second, 100*time.Millisecond,
134+
"restarted subscription must receive messages (got %d messages)", restartCount.Load())
135+
136+
t.Logf("Phase 4 passed: stable=%d, restart=%d", stableCount.Load(), restartCount.Load())
137+
}
138+
139+
// TestMultipleSubscriptionsIsolation verifies that multiple concurrent
140+
// subscriptions operate independently on the shared channel.
141+
func TestMultipleSubscriptionsIsolation(t *testing.T) {
142+
conn, err := amqp.Dial(testRabbitMQURL)
143+
require.NoError(t, err, "RabbitMQ must be running at %s", testRabbitMQURL)
144+
conn.Close()
145+
146+
log := logger.NewLogger("test")
147+
148+
r := NewRabbitMQ(log).(*rabbitMQ)
149+
err = r.Init(t.Context(), pubsub.Metadata{Base: mdata.Base{
150+
Properties: map[string]string{
151+
metadataConnectionStringKey: testRabbitMQURL,
152+
metadataConsumerIDKey: "isolation-test",
153+
metadataDurableKey: "true",
154+
metadataDeleteWhenUnusedKey: "false",
155+
},
156+
}})
157+
require.NoError(t, err)
158+
defer r.Close()
159+
160+
const numTopics = 5
161+
const msgsPerTopic = 10
162+
163+
var counts [numTopics]atomic.Int32
164+
165+
// Subscribe to all topics
166+
for i := range numTopics {
167+
topic := fmt.Sprintf("isolation-topic-%d", i)
168+
idx := i
169+
err := r.Subscribe(t.Context(), pubsub.SubscribeRequest{Topic: topic}, func(_ context.Context, msg *pubsub.NewMessage) error {
170+
counts[idx].Add(1)
171+
return nil
172+
})
173+
require.NoError(t, err)
174+
}
175+
176+
// Publish concurrently
177+
var wg sync.WaitGroup
178+
for i := range numTopics {
179+
wg.Add(1)
180+
go func(i int) {
181+
defer wg.Done()
182+
topic := fmt.Sprintf("isolation-topic-%d", i)
183+
publishN(t, r, topic, msgsPerTopic)
184+
}(i)
185+
}
186+
wg.Wait()
187+
188+
// All topics should receive their messages
189+
assert.Eventually(t, func() bool {
190+
for i := range numTopics {
191+
if counts[i].Load() < msgsPerTopic {
192+
return false
193+
}
194+
}
195+
return true
196+
}, 15*time.Second, 100*time.Millisecond, "all topics should receive messages")
197+
198+
for i := range numTopics {
199+
t.Logf("topic %d: %d messages", i, counts[i].Load())
200+
}
201+
}
202+
203+
func publishN(t *testing.T, r pubsub.PubSub, topic string, n int) {
204+
t.Helper()
205+
for i := range n {
206+
err := r.Publish(t.Context(), &pubsub.PublishRequest{
207+
Topic: topic,
208+
Data: []byte(fmt.Sprintf("msg-%d", i)),
209+
})
210+
require.NoError(t, err)
211+
}
212+
}

pubsub/rabbitmq/rabbitmq_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,10 @@ func (r *rabbitMQInMemoryBroker) Consume(queue string, consumer string, autoAck
511511
return r.buffer, nil
512512
}
513513

514+
func (r *rabbitMQInMemoryBroker) Cancel(consumer string, noWait bool) error {
515+
return nil
516+
}
517+
514518
func (r *rabbitMQInMemoryBroker) Nack(tag uint64, multiple bool, requeue bool) error {
515519
return nil
516520
}

0 commit comments

Comments
 (0)