SQS messages idle in queue for up to 99seconds until readMessage picks it up

423 Views Asked by At

I have an ETL process that converts files from s3. S3 event notification sends a message to SQS and a fargate task polls the queue for the message. My problem is that the message doesn't get picked up for a variable amount of time, up to 99seconds.

I*m looking for messages every 2 seconds. I can confirm through the console that a message is in the queue, but it doesn't get read for quite a while. Also, I'm printing the delay the message was in the queue in the log, based on the difference between MessageSystemAttributeNameApproximateReceiveCount and MessageSystemAttributeNameSentTimestamp

    queueURL := os.Getenv("QUEUE_URL")
    csvBucket := os.Getenv("CSV_BUCKET_NAME")

    sess, _ := session.NewSession()
    //TODO: check and handle error
    svc := sqs.New(sess)

    for {
        time.Sleep(time.Duration(2) * time.Second)

        msgResult, err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{
            AttributeNames: []*string{
                aws.String(sqs.MessageSystemAttributeNameSenderId),
                aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
                aws.String(sqs.MessageSystemAttributeNameApproximateReceiveCount),
                aws.String(sqs.MessageSystemAttributeNameApproximateFirstReceiveTimestamp),
                aws.String(sqs.MessageSystemAttributeNameSequenceNumber),
                aws.String(sqs.MessageSystemAttributeNameMessageDeduplicationId),
                aws.String(sqs.MessageSystemAttributeNameMessageGroupId),
                aws.String(sqs.MessageSystemAttributeNameAwstraceHeader),
            },
            MessageAttributeNames: []*string{
                aws.String(sqs.QueueAttributeNameAll),
            },
            QueueUrl:            aws.String(queueURL),
            MaxNumberOfMessages: aws.Int64(10),
            VisibilityTimeout:   aws.Int64(30), // 60 seconds
            WaitTimeSeconds:     aws.Int64(0),
        })

        if err != nil {
            log.Printf("recieve message failed: %v", err)
            return
        }

        if len(msgResult.Messages) == 0 {
            continue
        }
        //TODO: process all results
        // fmt.Printf("Success: %+v\n", msgResult.Messages)
        log.Printf("amount of messages %v", len(msgResult.Messages))

        i, err := strconv.ParseInt(*msgResult.Messages[0].Attributes["ApproximateFirstReceiveTimestamp"], 10, 64)
        if err != nil {
            log.Printf("failed to parse int: %v", err)
            return
        }
        tm1 := time.Unix(0, i*int64(1000000))

        ii, err := strconv.ParseInt(*msgResult.Messages[0].Attributes["SentTimestamp"], 10, 64)
        if err != nil {
            panic(err)
        }
        tm2 := time.Unix(0, ii*int64(1000000))

/* some code to download from s3 and upload to another bucket */

        _, err = svc.DeleteMessage(&sqs.DeleteMessageInput{
            QueueUrl:      aws.String(queueURL),
            ReceiptHandle: msgResult.Messages[0].ReceiptHandle,
        })
        if err != nil {
            log.Println("delete failed")
            continue
        }
        log.Println("message delete succeeded")
    }
        
timestamp,message
1599145270790,2020/09/03 15:01:10 seconds the message was unprocessed in the queue: 0.909
1599145426542,2020/09/03 15:03:46 seconds the message was unprocessed in the queue: 19.835
1599145472884,2020/09/03 15:04:32 seconds the message was unprocessed in the queue: 4.721
1599145611897,2020/09/03 15:06:51 seconds the message was unprocessed in the queue: 24.793
1599145720293,2020/09/03 15:08:40 seconds the message was unprocessed in the queue: 40.296
1599145930662,2020/09/03 15:12:10 seconds the message was unprocessed in the queue: 66.736
1599145997155,2020/09/03 15:13:17 seconds the message was unprocessed in the queue: 10.961
1599146249316,2020/09/03 15:17:29 seconds the message was unprocessed in the queue: 99.084
1599146319998,2020/09/03 15:18:39 seconds the message was unprocessed in the queue: 35.015
1599146361620,2020/09/03 15:19:21 seconds the message was unprocessed in the queue: 17.353
1599146438448,2020/09/03 15:20:38 seconds the message was unprocessed in the queue: 45.878

I'm pretty lost what I'm doing wrong, after that I upload to another bucket into a queue and it gets picked up by a lambda, which consistently gets done in about a second

cdk code that defines the pipeline

        super(scope, id)

        const dlq = new sqs.Queue(this, 'DeadLetterQueue', {
            retentionPeriod: cdk.Duration.days(14),
        });

        const q = new sqs.Queue(this, 'ProcessingQueue', {
            deadLetterQueue: {
                queue: dlq,
                maxReceiveCount: 3,
            },
        });

        const queueFargate = new ecsPatterns.QueueProcessingFargateService(this, 'Service', {
            queue: q,
            vpc: props.vpc,
            memoryLimitMiB: 512,
            cpu: 256,
            image: ecs.ContainerImage.fromAsset('services/convert'),
            platformVersion: ecs.FargatePlatformVersion.VERSION1_4,
            desiredTaskCount: 1,
            environment: {
                QUEUE_URL: q.queueUrl,
                CSV_BUCKET_NAME: props.convertBucket.bucketName,
            },
        });

        props.uploadBucket.addEventNotification(s3.EventType.OBJECT_CREATED, new s3n.SqsDestination(queueFargate.sqsQueue))
        props.uploadBucket.grantRead(queueFargate.service.taskDefinition.taskRole)
        props.convertBucket.grantWrite(queueFargate.service.taskDefinition.taskRole)

Version of AWS SDK for Go?

github.com/aws/aws-sdk-go v1.34.6

go 1.14

Expected behavior

Messages should be read as soon as they are in the queue.

1

There are 1 best solutions below

0
On

This could be the result of using short-polling instead of long-polling. Per the AWS documentation on Amazon SQS short and long polling:

With short polling, the ReceiveMessage request queries only a subset of the servers (based on a weighted random distribution) to find messages that are available to include in the response. Amazon SQS sends the response right away, even if the query found no messages.

Long-polling, on the other hand, will:

Reduce false empty responses by querying all—rather than a subset of—Amazon SQS servers.

You can implement long-polling in your code by passing a non-zero value for WaitTimeSeconds, and removing the two-second sleep on each iteration.