There is DSS call inside Sequence on every Iteration by using Iterator. This DSS will insert data to Database Table.
Sequence:
<?xml version="1.0" encoding="UTF-8"?>
<sequence name="PushData_Seq" onError="Failure_Sequence" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
<log level="custom">
<property name="*********PushData_Seq" value="is called **********"/>
</log>
<!-- Iterate over JSON Array Elements -->
<iterate expression="$body//rows" id="BigQID">
<target>
<sequence>
<property name="FORCE_ERROR_ON_SOAP_FAULT" scope="default" type="STRING" value="true"/>
<property expression="//f[1]/v/text()" name="Name" scope="default" type="STRING"/>
<property expression="//f[2]/v/text()" name="Age" scope="default" type="STRING"/>
<!-- Frame payload for DSS Call -->
<header name="Action" scope="default" value="urn:data_insert_op"/>
<payloadFactory media-type="xml">
<format>
<Envelope xmlns="http://www.w3.org/2003/05/soap-envelope">
<Body>
<data_insert_op xmlns="http://ws.wso2.org/dataservice">
<Name>$1</tenant>
<Age>$2</source>
</data_insert_op>
</Body>
</Envelope>
</format>
<args>
<arg evaluator="xml" expression="get-property('Name')"/>
<arg evaluator="xml" expression="get-property('Age')"/>
</args>
</payloadFactory>
<!-- calling DSS to push data -->
<call>
<endpoint>
<http method="post" uri-template="http://localhost:8280/services/test_DSS/data_insert_op">
<timeout>
<duration>60000</duration>
</timeout>
<suspendOnFailure>
<errorCodes>-1</errorCodes>
<initialDuration>0</initialDuration>
<progressionFactor>1.0</progressionFactor>
<maximumDuration>0</maximumDuration>
</suspendOnFailure>
<markForSuspension>
<retriesBeforeSuspension>0</retriesBeforeSuspension>
</markForSuspension>
</http>
</endpoint>
</call>
</sequence>
</target>
</iterate>
<property name="result" scope="default">
<result xmlns=""/>
</property>
<!-- Aggregate for collecting DSS response in order to identify target updated row count -->
<aggregate id="BigQID">
<completeCondition>
<messageCount max="-1" min="-1"/>
</completeCondition>
<onComplete aggregateElementType="root" enclosingElementProperty="result" expression="$body/*[1]">
<log level="custom">
<property name="ON Aggregate SEQ" value="called***"/>
<property expression="fn:count(//*[local-name()='UpdatedRowCount'])" name="*********BigQueryETL-TargetCount:: "/>
</log>
</onComplete>
</aggregate>
<log level="custom">
<property expression="fn:concat('The data_push is Completed and sync_end_time is: ',get-property('SYSTEM_DATE','yyyy-MM-dd HH:mm:ss'))" name="data_push INFO: "/>
</log>
</sequence>
- added
onError="Failure_Sequence"to the sequence, So when failure occurs, thisFailure_Sequencewill get triggered. - added
FORCE_ERROR_ON_SOAP_FAULTproperty also inside iterator to triggerFailure_Sequence
Some sort of failure happened during DSS operation like below.
{org.wso2.carbon.dataservices.core.DBInOutMessageReceiver} - Error in in-out message receiver
DS Code: DATABASE_ERROR
Nested Exception:-
javax.xml.stream.XMLStreamException: DS Fault Message: Error in 'SQLQuery.processPreNormalQuery': ERROR: value too long for type character varying(15)
DS Code: DATABASE_ERROR
Source Data Service:-
Name: Test_DSS
Location: /home/wso2carbon/wso2ei-6.4.0/wso2/tmp/carbonapps/-1234/1679035873865BigQuery-ETL_CAR_1.0.0-SNAPSHOT.car/Test_DSS_1.0.0/Test_DSS-1.0.0.dbs
Description: N/A
Default Namespace: http://ws.wso2.org/dataservice
Current Request Name: data_insert_op
Current Params: {Name=iCMP-Engineering-1, Age=20}
Nested Exception:-
org.postgresql.util.PSQLException: ERROR: value too long for type character varying(15)
at org.wso2.carbon.dataservices.core.dispatch.SingleDataServiceRequest.processRequest(SingleDataServiceRequest.java:75)
at org.wso2.carbon.dataservices.core.dispatch.DataServiceRequest.dispatch(DataServiceRequest.java:359)
at org.wso2.carbon.dataservices.core.DataServiceProcessor.dispatch(DataServiceProcessor.java:41)
at org.wso2.carbon.dataservices.core.DBInOutMessageReceiver.invokeBusinessLogic(DBInOutMessageReceiver.java:57)
at org.apache.axis2.receivers.AbstractInOutSyncMessageReceiver.invokeBusinessLogic(AbstractInOutSyncMessageReceiver.java:42)
at org.apache.axis2.receivers.AbstractMessageReceiver.receive(AbstractMessageReceiver.java:110)
at org.apache.axis2.engine.AxisEngine.receive(AxisEngine.java:180)
at org.apache.synapse.transport.passthru.ServerWorker.processNonEntityEnclosingRESTHandler(ServerWorker.java:337)
at org.apache.synapse.transport.passthru.ServerWorker.processEntityEnclosingRequest(ServerWorker.java:383)
at org.apache.synapse.transport.passthru.ServerWorker.run(ServerWorker.java:151)
at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: javax.xml.stream.XMLStreamException: DS Fault Message: Error in 'SQLQuery.processPreNormalQuery':
I know this ERROR is related to character length of column specific one, if we increase this limit, this could be resolved.
Updated: - DSS Response via tryitout tool
<?xml version='1.0' encoding='UTF-8'?>
<soapenv:Envelope xmlns:soapenv="http://www.w3.org/2003/05/soap-envelope">
<soapenv:Body>
<soapenv:Fault xmlns:axis2ns589="http://ws.wso2.org/dataservice">
<soapenv:Code>
<soapenv:Value>axis2ns589:DATABASE_ERROR</soapenv:Value>
</soapenv:Code>
<soapenv:Reason>
<soapenv:Text xml:lang="en-US">DS Code: DATABASE_ERROR
Nested Exception:-
javax.xml.stream.XMLStreamException: DS Fault Message: Error in 'SQLQuery.processPreNormalQuery': ERROR: duplicate key value violates unique constraint "dev_table_test_pkey"
Detail: Key (dev_id)=(Engineering-1) already exists.
DS Code: DATABASE_ERROR
Source Data Service:-
Name: WSO2_ZCloud_test_DSS
Location: /home/wso2carbon/wso2ei-6.4.0/wso2/tmp/carbonapps/-1234/1679061962717BigQuery-ETL_CAR_1.0.0-SNAPSHOT.car/WSO2_ZCloud_test_DSS_1.0.0/WSO2_ZCloud_test_DSS-1.0.0.dbs
Description: N/A
Default Namespace: http://ws.wso2.org/dataservice
Current Request Name: dev_inventory_insert_op
Current Params: {dev_name=Engineering-1, dev_id=Engineering-1, }
Nested Exception:-
org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "dev_table_test_pkey"
Detail: Key (dev_id)=(Engineering-1) already exists.
</soapenv:Text>
</soapenv:Reason>
<soapenv:Detail>
<axis2ns588:DataServiceFault xmlns:axis2ns588="http://ws.wso2.org/dataservice">
<axis2ns588:current_params>{dev_name=Engineering-1, dev_id=Engineering-1, }</axis2ns588:current_params>
<axis2ns588:source_data_service>
<axis2ns588:data_service_name>WSO2_ZCloud_test_DSS</axis2ns588:data_service_name>
<axis2ns588:description>N/A</axis2ns588:description>
<axis2ns588:location>/home/wso2carbon/wso2ei-6.4.0/wso2/tmp/carbonapps/-1234/1679061962717BigQuery-ETL_CAR_1.0.0-SNAPSHOT.car/WSO2_ZCloud_test_DSS_1.0.0/WSO2_ZCloud_test_DSS-1.0.0.dbs</axis2ns588:location>
<axis2ns588:default_namespace>http://ws.wso2.org/dataservice</axis2ns588:default_namespace>
</axis2ns588:source_data_service>
<axis2ns588:ds_code>DATABASE_ERROR</axis2ns588:ds_code>
<axis2ns588:nested_exception>org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "dev_table_test_pkey"
Detail: Key (dev_id)=(Engineering-1) already exists.</axis2ns588:nested_exception>
<axis2ns588:current_request_name>dev_inventory_insert_op</axis2ns588:current_request_name>
</axis2ns588:DataServiceFault>
</soapenv:Detail>
</soapenv:Fault>
</soapenv:Body>
</soapenv:Envelope>
My concern here is that, Even DSS throws ERROR why that Failure_Sequence call not happened. It should triggered right?
please share your thoughts on the same.
Yes, it should, I assume you are not getting a proper SOAP Fault from the DSS service.(WSO2 will not trigger Fault Sequence for HTTP errors) Can you verify that? Also, try isolating the Call and test it against the DSS response and see whether it's triggering the fault sequence.
On a different note, the way you have implemented the Iterate logic the Iteration will stop if an error occurs, I'm not sure whether that's the intended behavior. If not you can move the code in the Iterate to a separate named sequence and attach an onError sequence to this Sequence.
Update
Sample filter condition.