Anypoint MQ messages remains in in-flight and not been processed

360 Views Asked by At

I have a flow which submits around 10-20 salesforce bulk query job details to anypoint mq to be processed asynchronously. I am using normal Queue, Not using FIFO queue and wants process one message at a time. My subscriber configurations are given below. I am putting this whooping ack timeout to 15 minutes as max it has taken 15 minutes for a Job to change the status from jobUpload to JobCompleted.

MuleRuntime: 4.4 MQ Connector Version: 3.2.0

<anypoint-mq:subscriber doc:name="Subscribering Bulk Query Job Details" 
                        config-ref="Anypoint_MQ_Config" 
                        destination="${anyPointMq.name}" 
                        acknowledgementTimeout="15" 
                        acknowledgementTimeoutUnit="MINUTES">
            <anypoint-mq:subscriber-type >
                <anypoint-mq:prefetch maxLocalMessages="1" />
            </anypoint-mq:subscriber-type>
</anypoint-mq:subscriber>

Anypoint MQ Connector Configuration

<anypoint-mq:config name="Anypoint_MQ_Config" doc:name="Anypoint MQ Config" doc:id="ce3aaed9-dcba-41bc-8c68-037c5b1420e2">
      <anypoint-mq:connection clientId="${secure::anyPointMq.clientId}" clientSecret="${secure::anyPointMq.clientSecret}" url="${anyPointMq.url}">
          <reconnection>
              <reconnect frequency="3000" count="3" />
          </reconnection>
          <anypoint-mq:tcp-client-socket-properties connectionTimeout="30000" />
      </anypoint-mq:connection>
  </anypoint-mq:config>

Subscriber flow

<flow name="sfdc-bulk-query-job-subscription" doc:id="7e1e23d0-d7f1-45ed-a609-0fb35dd23e6a" maxConcurrency="1">
        <anypoint-mq:subscriber doc:name="Subscribering Bulk Query Job Details" doc:id="98b8b25e-3141-4bd7-a9ab-86548902196a" config-ref="Anypoint_MQ_Config" destination="${anyPointMq.sfPartnerEds.name}" acknowledgementTimeout="${anyPointMq.ackTimeout}" acknowledgementTimeoutUnit="MINUTES">
            <anypoint-mq:subscriber-type >
                <anypoint-mq:prefetch maxLocalMessages="${anyPointMq.prefecth.maxLocalMsg}" />
            </anypoint-mq:subscriber-type>
        </anypoint-mq:subscriber>
        <json-logger:logger doc:name="INFO - Bulk Job Details have been fetched" doc:id="b25c3850-8185-42be-a293-659ebff546d7" config-ref="JSON_Logger_Config" message='#["Bulk Job Details have been fetched for " ++ payload.object default ""]'>
            <json-logger:content ><![CDATA[#[output application/json ---
payload]]]></json-logger:content>
        </json-logger:logger>
        <set-variable value="#[p('serviceName.sfdcToEds')]" doc:name="ServiceName" doc:id="f1ece944-0ed8-4c0e-94f2-3152956a2736" variableName="ServiceName"/>
        <set-variable value="#[payload.object]" doc:name="sfObject" doc:id="2857c8d9-fe8d-46fa-8774-0eed91e3a3a6" variableName="sfObject" />
        <set-variable value="#[message.attributes.properties.key]" doc:name="key" doc:id="57028932-04ab-44c0-bd15-befc850946ec" variableName="key" />
        <flow-ref doc:name="bulk-job-status-check" doc:id="c6b9cd40-4674-47b8-afaa-0f789ccff657" name="bulk-job-status-check" />
        <json-logger:logger doc:name="INFO - subscribed bulk job id has been processed successfully" doc:id="7e469f92-2aff-4bf4-84d0-76577d44479a" config-ref="JSON_Logger_Config" message='#["subscribed bulk job id has been processed successfully for salesforce " ++ vars.sfObject default "" ++ " object"]' tracePoint="END"/>
    </flow>

After the bulk query job subscriber, I am checking the status of the job for 5 time with an interval of 1 minutes inside until successful scope. It generally exhausts all 5 attempts and subscribe it again and do the same process again until it gets completed. I have seen until successfull scope gets exhausted more than one for a single job.

Once the job's status changes to jobComplete. I fetch the result and sends to AWS S3 bucket via mulesoft system api. Here also I use a retry logic as due to large volume of data I always get this message while making first call

HTTP POST on resource 'https://****//dlb.lb.anypointdns.net:443/api/sys/aws/s3/databricks/object' failed: Remotely closed.

But during the second retry it gets successful response from S3 Bucket system api.

Now the main problem:

  1. Though I am using normal queue. I have notice messages remains in flight mode for infinite amount of time and still not get picket up by mule flow/subscriber. Below screenshot shows an example, there were 7 messages in flight but were not being picked up even after many days.
  2. As I have kept maxConcurrency and maxPrefetchLocalMsg to 1. But there are more than 1 messages are been taken out of the queue. Please help understand this. enter image description here enter image description here
0

There are 0 best solutions below