did anyone that send batch messages to rocketmq but message tags changed finally?

66 Views Asked by At

I tried to send batch messages to topic with this method below:

public SendResult send(Collection<Message> msgs,
        MessageQueue messageQueue) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(batch(msgs), messageQueue);
}

but when I consume the topic, the tags changed. More detail,I set "batch" as tags when produce,but get "batchCLUSTERt0" as tags when consume,it changed!

1

There are 1 best solutions below

0
Francis Lee On

i tried on my PC with codes bellow, but can't re-produce what you meet. can you show how your producer codes with set tags.

// producer batch messages
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("xxxxx:9876");
        producer.start();

        for (int i = 0; i < 10; i++)
            try {
                {
                    Message msg = new Message("TopicTest",
                            "TagA" + i,
                            "OrderID188" + i,
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

                    Message msg1 = new Message("TopicTest",
                            "TagA" + i,
                            "OrderID188" + i,
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    List<Message> msgs = new ArrayList<>();
                    msgs.add(msg);
                    msgs.add(msg1);

                    producer.send(msgs);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }



// consuming messages
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        consumer.subscribe("TopicTest", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //wrong time format 2017_0422_221800
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setNamesrvAddr("xxxxx:9876");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");

consumer result print