@@ -3,6 +3,7 @@ package queue
33import (
44 "fmt"
55 "testing"
6+ "time"
67
78 "github.com/stretchr/testify/assert"
89 "github.com/stretchr/testify/require"
@@ -144,3 +145,54 @@ func TestAMQPHeaders(t *testing.T) {
144145 })
145146 }
146147}
148+
149+ func TestAMQPRepublishBuried (t * testing.T ) {
150+ broker , err := NewBroker (testAMQPURI )
151+ require .NoError (t , err )
152+ defer func () { require .NoError (t , broker .Close ()) }()
153+
154+ queueName := newName ()
155+ queue , err := broker .Queue (queueName )
156+ require .NoError (t , err )
157+
158+ amqpQueue , ok := queue .(* AMQPQueue )
159+ require .True (t , ok )
160+
161+ buried := amqpQueue .buriedQueue
162+
163+ tests := []struct {
164+ name string
165+ payload string
166+ }{
167+ {name : "message 1" , payload : "payload 1" },
168+ {name : "message 2" , payload : "republish" },
169+ {name : "message 3" , payload : "payload 3" },
170+ {name : "message 3" , payload : "payload 4" },
171+ }
172+
173+ for _ , test := range tests {
174+ job , err := NewJob ()
175+ require .NoError (t , err )
176+
177+ job .raw = []byte (test .payload )
178+
179+ err = buried .Publish (job )
180+ require .NoError (t , err )
181+ time .Sleep (1 * time .Second )
182+ }
183+
184+ var condition RepublishConditionFunc = func (j * Job ) bool {
185+ return string (j .raw ) == "republish"
186+ }
187+
188+ err = queue .RepublishBuried (condition )
189+ require .NoError (t , err )
190+
191+ jobIter , err := queue .Consume (1 )
192+ require .NoError (t , err )
193+ defer func () { require .NoError (t , jobIter .Close ()) }()
194+
195+ job , err := jobIter .Next ()
196+ require .NoError (t , err )
197+ require .Equal (t , string (job .raw ), "republish" )
198+ }
0 commit comments