Skip to content

Commit 8b3bb69

Browse files
committed
feat: use functional options pattern
1 parent 733144a commit 8b3bb69

2 files changed

Lines changed: 278 additions & 65 deletions

File tree

aws/sqs/sqs.go

Lines changed: 183 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,129 @@ import (
1818
)
1919

2020
type Raw struct {
21-
Body string
22-
ReceiptHandle string
23-
Attributes map[string]string
21+
Body string
22+
ReceiptHandle string
23+
Attributes map[string]string
24+
MessageAttributes map[string]string
25+
}
26+
27+
type messageInput struct {
28+
// generic
29+
ctx context.Context
30+
31+
// send
32+
msgAttributes map[string]string
33+
delay *int32
34+
group string
35+
dedupe string
36+
37+
// receive
38+
poll bool
39+
visibilityTimeout *int32
40+
waitTime *int32
41+
queueAttributes []types.QueueAttributeName
42+
}
43+
44+
type MessageResponse struct {
45+
MessageId string
46+
}
47+
48+
type messageInputOptionFunc func(*messageInput)
49+
50+
func (s *SQS) SendMessage(queue string, body string, options ...messageInputOptionFunc) (MessageResponse, error) {
51+
msg := &messageInput{}
52+
for _, option := range options {
53+
option(msg)
54+
}
55+
56+
awsMsg := sqs.SendMessageInput{
57+
QueueUrl: aws.String(queue),
58+
MessageBody: aws.String(body),
59+
}
60+
61+
// delayed message
62+
if msg.delay != nil {
63+
awsMsg.DelaySeconds = *msg.delay
64+
}
65+
66+
// message with attributes
67+
if len(msg.msgAttributes) > 0 {
68+
awsMsg.MessageAttributes = map[string]types.MessageAttributeValue{}
69+
for k, v := range msg.msgAttributes {
70+
awsMsg.MessageAttributes[k] = types.MessageAttributeValue{
71+
DataType: aws.String("String"),
72+
StringValue: aws.String(v),
73+
}
74+
}
75+
}
76+
77+
// message for FIFO queue
78+
if msg.group != "" {
79+
awsMsg.MessageGroupId = aws.String(msg.group)
80+
awsMsg.MessageDeduplicationId = aws.String(msg.dedupe)
81+
}
82+
83+
// context
84+
ctx := msg.ctx
85+
if ctx == nil {
86+
ctx = context.TODO()
87+
}
88+
89+
response, err := s.client.SendMessage(ctx, &awsMsg)
90+
91+
return MessageResponse{MessageId: *response.MessageId}, err
92+
}
93+
94+
func WithDelay(delay int32) messageInputOptionFunc {
95+
return func(m *messageInput) {
96+
m.delay = new(int32)
97+
*m.delay = delay
98+
}
99+
}
100+
101+
func WithPolling(polling bool) messageInputOptionFunc {
102+
return func(m *messageInput) {
103+
m.poll = polling
104+
}
105+
}
106+
107+
func WithWait(waitTimeSeconds int32) messageInputOptionFunc {
108+
return func(m *messageInput) {
109+
m.waitTime = new(int32)
110+
*m.waitTime = waitTimeSeconds
111+
}
112+
}
113+
114+
func WithTimeout(visibilityTimeout int32) messageInputOptionFunc {
115+
return func(m *messageInput) {
116+
m.visibilityTimeout = new(int32)
117+
*m.visibilityTimeout = visibilityTimeout
118+
}
119+
}
120+
121+
func WithMessageAttributes(attributes map[string]string) messageInputOptionFunc {
122+
return func(m *messageInput) {
123+
m.msgAttributes = attributes
124+
}
125+
}
126+
127+
func WithContext(ctx context.Context) messageInputOptionFunc {
128+
return func(m *messageInput) {
129+
m.ctx = ctx
130+
}
131+
}
132+
133+
func WithQueueAttributes(attributes []types.QueueAttributeName) messageInputOptionFunc {
134+
return func(m *messageInput) {
135+
m.queueAttributes = attributes
136+
}
137+
}
138+
139+
func WithFifo(group, dedupe string) messageInputOptionFunc {
140+
return func(m *messageInput) {
141+
m.group = group
142+
m.dedupe = dedupe
143+
}
24144
}
25145

26146
type SQS struct {
@@ -97,13 +217,13 @@ func (s *SQS) Ready() bool {
97217
// Applications should be able to handle duplicate or out of order messages,
98218
// and should back off on Receive error.
99219
func (s *SQS) Receive(queueURL string, visibilityTimeout int32) (Raw, error) {
100-
return s.ReceiveWithContext(context.TODO(), queueURL, visibilityTimeout)
220+
return s.ReceiveMessage(queueURL, WithTimeout(visibilityTimeout))
101221
}
102222

103223
// ReceiveWithAttributes is the same as Receive except that Queue Attributes can be requested
104224
// to be received with the message.
105225
func (s *SQS) ReceiveWithAttributes(queueURL string, visibilityTimeout int32, attrs []types.QueueAttributeName) (Raw, error) {
106-
return s.ReceiveWithContextAttributes(context.TODO(), queueURL, visibilityTimeout, attrs)
226+
return s.ReceiveMessage(queueURL, WithTimeout(visibilityTimeout), WithQueueAttributes(attrs))
107227
}
108228

109229
// ReceiveWithContextAttributes by context and Queue Attributes,
@@ -112,37 +232,70 @@ func (s *SQS) ReceiveWithAttributes(queueURL string, visibilityTimeout int32, at
112232
// when system stop signal is received, an error with message '... context canceled' will be returned
113233
// which can be used to safely stop the system
114234
func (s *SQS) ReceiveWithContextAttributes(ctx context.Context, queueURL string, visibilityTimeout int32, attrs []types.QueueAttributeName) (Raw, error) {
115-
input := sqs.ReceiveMessageInput{
116-
QueueUrl: aws.String(queueURL),
117-
MaxNumberOfMessages: 1,
118-
VisibilityTimeout: visibilityTimeout,
119-
WaitTimeSeconds: 20,
120-
AttributeNames: attrs,
121-
}
122-
return s.receiveMessage(ctx, &input)
235+
return s.ReceiveMessage(queueURL, WithContext(ctx), WithTimeout(visibilityTimeout), WithWait(20), WithQueueAttributes(attrs))
123236
}
124237

125-
// receiveMessage is the common code used internally to receive an SQS message based
126-
// on the provided input.
127-
func (s *SQS) receiveMessage(ctx context.Context, input *sqs.ReceiveMessageInput) (Raw, error) {
238+
func (s *SQS) ReceiveMessage(queue string, options ...messageInputOptionFunc) (Raw, error) {
239+
msg := &messageInput{
240+
poll: true, // keep polling until message received
241+
}
242+
for _, option := range options {
243+
option(msg)
244+
}
128245

129-
for {
130-
r, err := s.client.ReceiveMessage(ctx, input)
131-
if err != nil {
132-
return Raw{}, err
133-
}
246+
awsMsg := sqs.ReceiveMessageInput{
247+
QueueUrl: aws.String(queue),
248+
MessageAttributeNames: []string{"All"},
249+
AttributeNames: msg.queueAttributes,
250+
}
251+
252+
// visibility
253+
if msg.visibilityTimeout != nil {
254+
awsMsg.VisibilityTimeout = *msg.visibilityTimeout
255+
}
256+
257+
// wait time
258+
if msg.waitTime != nil {
259+
awsMsg.WaitTimeSeconds = *msg.waitTime
260+
}
261+
262+
// context
263+
ctx := msg.ctx
264+
if ctx == nil {
265+
ctx = context.TODO()
266+
}
267+
268+
r, err := s.client.ReceiveMessage(ctx, &awsMsg)
269+
if err != nil {
270+
return Raw{}, err
271+
}
134272

273+
for {
135274
switch {
136275
case r == nil || len(r.Messages) == 0:
137276
// no message received
138-
continue
277+
if msg.poll {
278+
continue
279+
} else {
280+
return Raw{}, nil
281+
}
139282
case len(r.Messages) == 1:
140283
raw := r.Messages[0]
141284

285+
msgAttributes := map[string]string{}
286+
for k, v := range raw.MessageAttributes {
287+
if aws.ToString(v.DataType) == "Binary" {
288+
continue
289+
}
290+
291+
msgAttributes[k] = aws.ToString(v.StringValue)
292+
}
293+
142294
m := Raw{
143-
Body: aws.ToString(raw.Body),
144-
ReceiptHandle: aws.ToString(raw.ReceiptHandle),
145-
Attributes: raw.Attributes,
295+
Body: aws.ToString(raw.Body),
296+
ReceiptHandle: aws.ToString(raw.ReceiptHandle),
297+
Attributes: raw.Attributes,
298+
MessageAttributes: msgAttributes,
146299
}
147300
return m, nil
148301
case len(r.Messages) > 1:
@@ -156,13 +309,7 @@ func (s *SQS) receiveMessage(ctx context.Context, input *sqs.ReceiveMessageInput
156309
// when system stop signal is received, an error with message '... context canceled' will be returned
157310
// which can be used to safely stop the system
158311
func (s *SQS) ReceiveWithContext(ctx context.Context, queueURL string, visibilityTimeout int32) (Raw, error) {
159-
input := sqs.ReceiveMessageInput{
160-
QueueUrl: aws.String(queueURL),
161-
MaxNumberOfMessages: 1,
162-
VisibilityTimeout: visibilityTimeout,
163-
WaitTimeSeconds: 20,
164-
}
165-
return s.receiveMessage(ctx, &input)
312+
return s.ReceiveMessage(queueURL, WithContext(ctx), WithTimeout(visibilityTimeout))
166313
}
167314

168315
// Delete deletes the message referred to by receiptHandle from the queue.
@@ -179,49 +326,20 @@ func (s *SQS) Delete(queueURL, receiptHandle string) error {
179326

180327
// Send sends the message body to the SQS queue referred to by queueURL.
181328
func (s *SQS) Send(queueURL string, body string) error {
182-
params := sqs.SendMessageInput{
183-
QueueUrl: aws.String(queueURL),
184-
MessageBody: aws.String(body),
185-
}
186-
187-
_, err := s.client.SendMessage(context.TODO(), &params)
188-
329+
_, err := s.SendMessage(queueURL, body)
189330
return err
190331
}
191332

192333
// SendWithDelay is the same as Send but adds a delay (in seconds) before sending.
193334
func (s *SQS) SendWithDelay(queueURL string, body string, delay int32) error {
194-
params := sqs.SendMessageInput{
195-
QueueUrl: aws.String(queueURL),
196-
MessageBody: aws.String(body),
197-
DelaySeconds: delay,
198-
}
199-
200-
_, err := s.client.SendMessage(context.TODO(), &params)
201-
335+
_, err := s.SendMessage(queueURL, body, WithDelay(delay))
202336
return err
203337
}
204338

205339
// SendFifoMessage puts a message onto the given AWS SQS queue.
206340
func (s *SQS) SendFifoMessage(queue, group, dedupe string, msg []byte) (string, error) {
207-
var id *string
208-
if dedupe != "" {
209-
id = aws.String(dedupe)
210-
}
211-
params := sqs.SendMessageInput{
212-
MessageBody: aws.String(string(msg)),
213-
QueueUrl: aws.String(queue),
214-
MessageGroupId: aws.String(group),
215-
MessageDeduplicationId: id,
216-
}
217-
output, err := s.client.SendMessage(context.TODO(), &params)
218-
if err != nil {
219-
return "", err
220-
}
221-
if id := output.MessageId; id != nil {
222-
return *id, nil
223-
}
224-
return "", nil
341+
r, err := s.SendMessage(queue, string(msg), WithFifo(group, dedupe))
342+
return r.MessageId, err
225343
}
226344

227345
// Leverage the sendbatch api for uploading large numbers of messages

0 commit comments

Comments
 (0)