I have an ETL process that converts files from s3. S3 event notification sends a message to SQS and a fargate task polls the queue for the message. My problem is that the message doesn't get picked up for a variable amount of time, up to 99seconds.
I*m looking for messages every 2 seconds. I can confirm through the console that a message is in the queue, but it doesn't get read for quite a while. Also, I'm printing the delay the message was in the queue in the log, based on the difference between MessageSystemAttributeNameApproximateReceiveCount and MessageSystemAttributeNameSentTimestamp
queueURL := os.Getenv("QUEUE_URL")
csvBucket := os.Getenv("CSV_BUCKET_NAME")
sess, _ := session.NewSession()
//TODO: check and handle error
svc := sqs.New(sess)
for {
time.Sleep(time.Duration(2) * time.Second)
msgResult, err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameSenderId),
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
aws.String(sqs.MessageSystemAttributeNameApproximateReceiveCount),
aws.String(sqs.MessageSystemAttributeNameApproximateFirstReceiveTimestamp),
aws.String(sqs.MessageSystemAttributeNameSequenceNumber),
aws.String(sqs.MessageSystemAttributeNameMessageDeduplicationId),
aws.String(sqs.MessageSystemAttributeNameMessageGroupId),
aws.String(sqs.MessageSystemAttributeNameAwstraceHeader),
},
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
QueueUrl: aws.String(queueURL),
MaxNumberOfMessages: aws.Int64(10),
VisibilityTimeout: aws.Int64(30), // 60 seconds
WaitTimeSeconds: aws.Int64(0),
})
if err != nil {
log.Printf("recieve message failed: %v", err)
return
}
if len(msgResult.Messages) == 0 {
continue
}
//TODO: process all results
// fmt.Printf("Success: %+v\n", msgResult.Messages)
log.Printf("amount of messages %v", len(msgResult.Messages))
i, err := strconv.ParseInt(*msgResult.Messages[0].Attributes["ApproximateFirstReceiveTimestamp"], 10, 64)
if err != nil {
log.Printf("failed to parse int: %v", err)
return
}
tm1 := time.Unix(0, i*int64(1000000))
ii, err := strconv.ParseInt(*msgResult.Messages[0].Attributes["SentTimestamp"], 10, 64)
if err != nil {
panic(err)
}
tm2 := time.Unix(0, ii*int64(1000000))
/* some code to download from s3 and upload to another bucket */
_, err = svc.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: aws.String(queueURL),
ReceiptHandle: msgResult.Messages[0].ReceiptHandle,
})
if err != nil {
log.Println("delete failed")
continue
}
log.Println("message delete succeeded")
}
timestamp,message
1599145270790,2020/09/03 15:01:10 seconds the message was unprocessed in the queue: 0.909
1599145426542,2020/09/03 15:03:46 seconds the message was unprocessed in the queue: 19.835
1599145472884,2020/09/03 15:04:32 seconds the message was unprocessed in the queue: 4.721
1599145611897,2020/09/03 15:06:51 seconds the message was unprocessed in the queue: 24.793
1599145720293,2020/09/03 15:08:40 seconds the message was unprocessed in the queue: 40.296
1599145930662,2020/09/03 15:12:10 seconds the message was unprocessed in the queue: 66.736
1599145997155,2020/09/03 15:13:17 seconds the message was unprocessed in the queue: 10.961
1599146249316,2020/09/03 15:17:29 seconds the message was unprocessed in the queue: 99.084
1599146319998,2020/09/03 15:18:39 seconds the message was unprocessed in the queue: 35.015
1599146361620,2020/09/03 15:19:21 seconds the message was unprocessed in the queue: 17.353
1599146438448,2020/09/03 15:20:38 seconds the message was unprocessed in the queue: 45.878
I'm pretty lost what I'm doing wrong, after that I upload to another bucket into a queue and it gets picked up by a lambda, which consistently gets done in about a second
cdk code that defines the pipeline
super(scope, id)
const dlq = new sqs.Queue(this, 'DeadLetterQueue', {
retentionPeriod: cdk.Duration.days(14),
});
const q = new sqs.Queue(this, 'ProcessingQueue', {
deadLetterQueue: {
queue: dlq,
maxReceiveCount: 3,
},
});
const queueFargate = new ecsPatterns.QueueProcessingFargateService(this, 'Service', {
queue: q,
vpc: props.vpc,
memoryLimitMiB: 512,
cpu: 256,
image: ecs.ContainerImage.fromAsset('services/convert'),
platformVersion: ecs.FargatePlatformVersion.VERSION1_4,
desiredTaskCount: 1,
environment: {
QUEUE_URL: q.queueUrl,
CSV_BUCKET_NAME: props.convertBucket.bucketName,
},
});
props.uploadBucket.addEventNotification(s3.EventType.OBJECT_CREATED, new s3n.SqsDestination(queueFargate.sqsQueue))
props.uploadBucket.grantRead(queueFargate.service.taskDefinition.taskRole)
props.convertBucket.grantWrite(queueFargate.service.taskDefinition.taskRole)
Version of AWS SDK for Go?
github.com/aws/aws-sdk-go v1.34.6
go 1.14
Expected behavior
Messages should be read as soon as they are in the queue.
This could be the result of using short-polling instead of long-polling. Per the AWS documentation on Amazon SQS short and long polling:
Long-polling, on the other hand, will:
You can implement long-polling in your code by passing a non-zero value for
WaitTimeSeconds
, and removing the two-second sleep on each iteration.