Stomp.py not processing one message at a time

1.8k Views Asked by At

I am using Stomp.py connecting to a standard ACtiveMQ server. I am simulating cases where the receiver crashes and I want to be able to restart it and it continue running from the message after the one that caused it to crash.

I have created two sample scripts:

  • putMessagesToQueue.py - This will put 56 messages into the destination
  • readMessagesFromQueue.py - This will read messages from the destination. If it reads the 6th message it will raise an exception. Each message takes 1 second to process

Steps I take to run the test:

  1. I run putMessagesToQueue.py
  2. I run readMessagesFromQueue.py - it processes 5 messages sucessfully and an exception is raised in message 6
  3. I terminate readMessagesFromQueue.py (ctrl-c)
  4. I run readMessagesFromQueue.py again

For the behaviour I want in step 4 I want it to start processing from message 7.

However I don't see this. If reciever subscribes with ack='auto' then in step 4 it processes no messages - all the messages are gone from the queue and I have lost 50 messages!

If I use ack='client' or ack='client-individual' then on step 4 it starts again from the beginning then crashes again on message 6.

This seems to suggest that the reciever is not processing messages on at a time, instead it is taking every single message at once and running through each one. I don't want this behaviour because I would like to scale up to running 5 recievers and I want the load distributed. At the moment the first reciever I start takes all the messages and starts churning through them and recievers 2-4 just wait for new messages. I want the recievers to take messages one at a time instead!

Can anyone give any hints on how I am implementing this wrong:

Source

putMessagesToQueue.py

import stomp

stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"

conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)

for x in range(0,5):
  conn.send(body="OK-BEFORE-CRASH", destination=destination)

conn.send(body="CRASH", destination=destination)

for x in range(0,50):
  conn.send(body="OK-AFTER-CRASH", destination=destination)

readMessagesFromQueue.py

import stomp
import time

stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"

conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)

class StompConnectionListenerClass(stomp.ConnectionListener):
  processMessage = None
  def __init__(self, processMessage):
    self.processMessage = processMessage
  def on_error(self, headers, message):
    print('XX received an error "%s"' % message)
  def on_message(self, headers, message):
    self.processMessage(headers, message)

def messageProcessingFunction(headers, message):
  print('Main recieved a message "%s"' % message)
  if (message=="CRASH"):
    print("Message told processor to crash")
    raise Exception("Reached message which crashes reciever")
  time.sleep(1) # simulate processing message taking time

stompConnectionListener = StompConnectionListenerClass(processMessage=messageProcessingFunction)
conn.set_listener('', stompConnectionListener)

print("Subscribing")
conn.subscribe(destination=destination, id=1, ack='auto')
#conn.subscribe(destination=destination, id=1, ack='client')
#conn.subscribe(destination=destination, id=1, ack='client-individual')

print("Terminate loop starting (Press ctrl+c when you want to exit)")
try:
    while True:
        time.sleep(10)
except KeyboardInterrupt:
    print('interrupted - so exiting!')

conn.close()

print("Reciever terminated")

Update 001

I managed to obtain the desired behavour described above by changing the receive function to use ack='client-individual' and to manually send ack messages. (See new version below)

But I am still unable to get the recievers to process one message at a time. This can be demonstrated in the following steps:

  1. I run putMessagesToQueue.py
  2. I run readMessagesFromQueue2.py - it will start processing
  3. In a new terminal run readMessagesFromQueue2.py

At first the second readMessagesFromQueue2 does nothing until the first one crashes, it then starts receiving messages. I want both instances of the reciever to read the messages from the start.

readMessagesFromQueue2.py

import stomp
import time

stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"

conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)

class StompConnectionListenerClass(stomp.ConnectionListener):
  processMessage = None
  conn = None
  def __init__(self, processMessage, conn):
    self.processMessage = processMessage
    self.conn = conn
  def on_error(self, headers, message):
    print('XX received an error "%s"' % message)
  def on_message(self, headers, message):
    try:
      self.processMessage(headers, message)
    finally:
      self.conn.ack(id=headers["message-id"], subscription=headers["subscription"])

def messageProcessingFunction(headers, message):
  print('Main recieved a message "%s"' % message)
  if (message=="CRASH"):
    print("Message told processor to crash")
    raise Exception("Reached message which crashes reciever")
  time.sleep(1) # simulate processing message taking time

stompConnectionListener = StompConnectionListenerClass(processMessage=messageProcessingFunction, conn=conn)
conn.set_listener('', stompConnectionListener)

print("Subscribing")
conn.subscribe(destination=destination, id=1, ack='client-individual')

print("Terminate loop starting (Press ctrl+c when you want to exit)")
try:
    while True:
        time.sleep(10)
except KeyboardInterrupt:
    print('interrupted - so exiting!')

conn.close()

print("Reciever terminated")
1

There are 1 best solutions below

1
On

Lots of reading of diffent docs and I found the problem.

ActiveMQ has an option prefetch size - https://svn.apache.org/repos/infra/websites/production/activemq/content/5.7.0/what-is-the-prefetch-limit-for.html

If you have few messages that take a long time to process you can set it to 1. This is not apropiate in other situations.

I can do this in stopm.py with the following line: conn.subscribe(destination=destination, id=1, ack='client-individual', headers={'activemq.prefetchSize': 1})

So using manual or auto ack was neither here nor there. The key is limiting prefetch to 1.