Error Parsing json message through Kafka listener in Mulesoft

51 Views Asked by At

I am trying to parse the message from kafka topic though kafka message listener. I am able to see the message through producer and in the kafka topic. But in consumer flow, the transform message is unable to parse the json properly.

Consumer flow

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:bigquery="http://www.mulesoft.org/schema/mule/bigquery" xmlns:tls="http://www.mulesoft.org/schema/mule/tls"
    xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
    xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:kafka="http://www.mulesoft.org/schema/mule/kafka" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/kafka http://www.mulesoft.org/schema/mule/kafka/current/mule-kafka.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/tls http://www.mulesoft.org/schema/mule/tls/current/mule-tls.xsd
http://www.mulesoft.org/schema/mule/bigquery http://www.mulesoft.org/schema/mule/bigquery/current/mule-bigquery.xsd">
    <flow name="account-consumer-npFlow" doc:id="0d996208-7cb6-42b9-93f2-77ead77d1883" >
        <kafka:message-listener doc:name="Message listener" doc:id="c302fcb3-539c-42d9-8d21-347da8328ecd" config-ref="EDH_Consumer_np_configuration">
            <repeatable-in-memory-stream />
        </kafka:message-listener>
        <ee:transform doc:name="Transform Message" doc:id="28ca3ce2-2c57-4c86-a845-f00d298a6590" >
            <ee:message >
                <ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
{
    out : payload.ChangeEventHeader.changeType
}]]></ee:set-payload>
            </ee:message>
        </ee:transform>
        <logger level="INFO" doc:name="Logger" doc:id="c4bc21b6-98d6-466c-a086-1fd5bf48343b" message="#[%dw 2.0&#10;output application/json&#10;---&#10;payload]" />
    </flow>
</mule>

Error Log:

INFO  2024-02-06 10:24:43,087 [[MuleRuntime].uber.11: [sf-setotcemailtype-procapi-poc].account-producer-np.CPU_LITE @37e0e7a1] [processor: account-producer-np/processors/0/route/1/processors/0; event: 3bc1f370-c50c-11ee-942a-acde48001122] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: {
  "ChangeEventHeader": {
    "commitNumber": 1707236682278494209,
    "commitUser": "0057f000005zHkEAAU",
    "sequenceNumber": 1,
    "entityName": "Account",
    "changeType": "DELETE",
    "changedFields": [
      
    ],
    "changeOrigin": "",
    "transactionKey": "0000295d-4636-8277-714a-0fbcba7f9c9b",
    "commitTimestamp": 1707236682000,
    "recordIds": [
      "001O1000007mPzNIAU"
    ]
  }
}
ERROR 2024-02-06 10:24:43,907 [[MuleRuntime].uber.11: [sf-setotcemailtype-procapi-poc].account-consumer-npFlow.CPU_INTENSIVE @3e250b1c] [processor: ; event: 3c5c37a1-c50c-11ee-942a-acde48001122] org.mule.runtime.core.privileged.exception.DefaultExceptionListener: 
********************************************************************************
Message               : "You called the function 'Value Selector' with these arguments: 
  1: Binary ("ewogICJpZCI6IFsKICAgICIwMDFPMTAwMDAwN21Qek5JQVUiCiAgXSwKICAiYWN0aXZlX19jIjog...)
  2: Name ("ChangeEventHeader")

But it expects one of these combinations:
  (Array, Name)
  (Array, String)
  (Date, Name)
  (DateTime, Name)
  (LocalDateTime, Name)
  (LocalTime, Name)
  (Object, Name)
  (Object, String)
  (Period, Name)
  (Time, Name)

5|  out : payload.ChangeEventHeader.changeType
          ^^^^^^^^^^^^^^^^^^^^^^^^^
Trace:
  at anonymous::main (line: 5, column: 8)" evaluating expression: "%dw 2.0
output application/json
---
{
    out : payload.ChangeEventHeader.changeType
}".
Element               : account-consumer-npFlow/processors/0 @ sf-setotcemailtype-procapi-poc:account-consumer-np.xml:15 (Transform Message)
Element DSL           : <ee:transform doc:name="Transform Message" doc:id="28ca3ce2-2c57-4c86-a845-f00d298a6590">
<ee:message>
<ee:set-payload><![CDATA[
%dw 2.0
output application/json
---
{
    out : payload.ChangeEventHeader.changeType
}
]]></ee:set-payload>
</ee:message>
</ee:transform>
Error type            : MULE:EXPRESSION
FlowStack             : at account-consumer-npFlow(account-consumer-npFlow/processors/0 @ sf-setotcemailtype-procapi-poc:account-consumer-np.xml:15 (Transform Message))

  (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************

I am able to parse the payload message just fine in dataWeave playground.

{
  "ChangeEventHeader": {
    "commitNumber": 1707236682278494209,
    "commitUser": "0057f000005zHkEAAU",
    "sequenceNumber": 1,
    "entityName": "Account",
    "changeType": "DELETE",
    "changedFields": [
      
    ],
    "changeOrigin": "",
    "transactionKey": "0000295d-4636-8277-714a-0fbcba7f9c9b",
    "commitTimestamp": 1707236682000,
    "recordIds": [
      "001O1000007mPzNIAU"
    ]
  }
}

with script

%dw 2.0
output application/json
---
{
    out : payload.ChangeEventHeader.changeType
}

returns as expected.

{
  "out": "DELETE"
}

Not sure why transform message is unable to parse it.

1

There are 1 best solutions below

3
aled On

The error is telling you that the DataWeave script is receiving a binary as an input, not a JSON value.

Message               : "You called the function 'Value Selector' with these arguments: 
  1: Binary ("ewogICJpZCI6IFsKICAgICIwMDFPMTAwMDAwN21Qek5JQVUiCiAgXSwKICAiYWN0aXZlX19jIjog...)
  2: Name ("ChangeEventHeader")

Since it is a binary it doesn't has keys. The expression payload.ChangeEventHeader fails because trying to use the single-value selector to get the value of key from a binary is not valid.

You have to check why your Kafka message is a binary.