BlazeDS JMS Adapter disconnection

118 Views Asked by At

I Integrate Blazeds messaging system with ActiveMQ: .

I configure BlazeDS to create a durable flex destination using the flex.messaging.services.messaging.adapters.JMSAdapter (the configuration is below).

I was able to create a small application that register to the topic and recieve the message. Since I need to recieve the message sent when I was offline I create a durable query.

Everything works fine as long as I unsubscribe correctly the flex consumer.

My problem is when the flex consumer do not call unsubscribe. For Example when I close the browser.

In this situation the topic is still Active (I can see it from the ActiveMQ web console) and it consume the messages.

When I connect again with a new instance of the flex application the connection is OK but I did not recieve any message. Nor the ones sent whne I was away, neither the new one. I cannot even delete the topic using the ActiveMQ web console:javax.jms.JMSException: Durable consumer is in use.

The only solution is to delete the topic is restart the webapplication containg the BlazeDS broker.

Can someone give me another solution?

this is my blazeds configuration

  <adapters>
    <adapter-definition id="actionscript" class="flex.messaging.services.messaging.adapters.ActionScriptAdapter" default="true" />
    <adapter-definition id="jms" class="flex.messaging.services.messaging.adapters.JMSAdapter"/>
</adapters>

...

  <destination id="warehouse-topic-jms">
    <properties>
        <jms>
            <destination-type>Topic</destination-type>
            <message-type>javax.jms.ObjectMessage</message-type>
            <connection-factory>java:comp/env/jms/flex/TopicConnectionFactory</connection-factory>
            <destination-jndi-name>java:comp/env/jms/warehouse</destination-jndi-name>
            <delivery-mode>PERSISTENT</delivery-mode>
            <message-priority>DEFAULT_PRIORITY</message-priority>
            <acknowledge-mode>AUTO_ACKNOWLEDGE</acknowledge-mode>
            <initial-context-environment>
                <property>
                    <name>Context.INITIAL_CONTEXT_FACTORY</name>
                    <value>org.apache.activemq.jndi.ActiveMQInitialContextFactory</value>
                </property>
                <property>
                    <name>Context.PROVIDER_URL</name>
                    <value>tcp://localhost:61616</value>
                </property>
            </initial-context-environment>
        </jms>
        <server>
            <durable>true</durable>             
        </server>
    </properties>

    <adapter ref="jms"/>
</destination>

this is the Tomcat context to expose the JNDI resources

<Resource name="jms/flex/TopicConnectionFactory"
    type="org.apache.activemq.ActiveMQConnectionFactory"
    description="JMS Connection Factory"
    factory="org.apache.activemq.jndi.JNDIReferenceFactory"
    brokerURL="tcp://localhost:61616"
    brokerName="myBroker"/>
<Resource name="jms/warehouse"
    type="org.apache.activemq.command.ActiveMQTopic"
    description="warehouse.topic"
    factory="org.apache.activemq.jndi.JNDIReferenceFactory"
    physicalName="warehouse.topic"/>

and this is my flex consumer

<mx:Consumer id="consumer"   
                 channelConnect="consumer_channelConnectHandler(event)" 
                 channelFault="consumer_channelFaultHandler(event)" destination="warehouse-topic-jms"
                 fault="consumer_faultHandler(event)" message="consumer_messageHandler(event)"/>
1

There are 1 best solutions below

0
Panciz On

The only solution I found is to use the JMX MBEAN exposed by BlazeDS to force the JMSAdapter to remove the ActiveMQ Topic. In this way I can recreate a new Topic with the same clientID and recieve the messages.

Here the code of the Java method I implemented

public boolean deleteTopicQueue(String clientId) throws Exception {
        if(clientId==null){
            throw new Exception("Error removing topic: null name provided");
        }
        clientId=clientId.trim();
        JMXServiceURL url = new JMXServiceURL(DEFAULT_JMX_ADAPTER_URL);
        try (JMXConnector jmxc = JMXConnectorFactory.connect(url, null)){
            MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
            ObjectName pattern = new ObjectName(DEFAULT_JMSADAPTER_MEAN_QUERY);
            Set<ObjectName>  names =
                new TreeSet<ObjectName>(mbsc.queryNames(pattern, null));

            if(names.size()<=0){
                logger.info("Error Removig topic "+clientId+": No JMSAdapter found ");
                throw new Exception("Error Removig topic "+clientId+": No JMSAdapter found ");
            }

            while(names.iterator().hasNext()){
                ObjectName ob =  names.iterator().next();
                JMSAdapterControlMBean obProxy =  JMX.newMXBeanProxy(mbsc, ob,JMSAdapterControlMBean.class);
                String[] consumerList=  obProxy.getTopicConsumerIds();
                for(String consumer :consumerList){
                    if(consumer.trim().equals(clientId)){
                        logger.info("Removing "+consumer+" from "+ob.getCanonicalName());
                        obProxy.removeConsumer(consumer);
                        return true;
                    }
                }
            }
            logger.debug("No consumer with ID "+clientId+" Found");
            return false;
        } catch (Exception e) {
            logger.info("Error Removig topic "+clientId+": No JMSAdapter found ");
            throw new Exception("Error Removig topic "+clientId+" :"+e.getMessage());
        }
    }