I'm building an AWS OpenSearch domain and Ingestion Pipeline and ingesting from source documents from a parquet files with the following schema:
query
keyword1 keyword2
keyword3 keyword4 keyword5
I'm trying to split it by space separator and ingest each keyword in a separate document into the OpenSearch index
This is my current pipeline configuration:
version: "2"
my-pipeline:
source:
---- removed -----
processor:
- split_string:
entries:
- source: "query"
delimiter_regex: "\\s+"
sink:
- opensearch:
hosts: <my host>
aws: <aws details>
index: index_1
document_id_field: query
I'm having the following exception while ingesting:
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `java.lang.String` from Array value (token `JsonToken.START_ARRAY`)
at [Source: UNKNOWN; byte offset: #UNKNOWN]
at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59) ~[jackson-databind-2.15.3.jar:2.15.3]
at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1752) ~[jackson-databind-2.15.3.jar:2.15.3]
at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1526) ~[jackson-databind-2.15.3.jar:2.15.3]
at com.fasterxml.jackson.databind.deser.std.StdDeserializer._deserializeFromArray(StdDeserializer.java:222) ~[jackson-databind-2.15.3.jar:2.15.3]
at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:46) ~[jackson-databind-2.15.3.jar:2.15.3]
at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:11) ~[jackson-databind-2.15.3.jar:2.15.3]
at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323) ~[jackson-databind-2.15.3.jar:2.15.3]
at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:4801) ~[jackson-databind-2.15.3.jar:2.15.3]
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2974) ~[jackson-databind-2.15.3.jar:2.15.3]
at com.fasterxml.jackson.databind.ObjectMapper.treeToValue(ObjectMapper.java:3438) ~[jackson-databind-2.15.3.jar:2.15.3]
at org.opensearch.dataprepper.model.event.JacksonEvent.mapNodeToObject(JacksonEvent.java:209) ~[data-prepper-api-2.6.1.jar:?]
at org.opensearch.dataprepper.model.event.JacksonEvent.get(JacksonEvent.java:199) ~[data-prepper-api-2.6.1.jar:?]
at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.getDocument(OpenSearchSink.java:468) ~[opensearch-2.6.1.jar:?]
at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.doOutput(OpenSearchSink.java:374) ~[opensearch-2.6.1.jar:?]
at org.opensearch.dataprepper.model.sink.AbstractSink.lambda$output$0(AbstractSink.java:67) ~[data-prepper-api-2.6.1.jar:?]
at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141) ~[micrometer-core-1.11.5.jar:1.11.5]
at org.opensearch.dataprepper.model.sink.AbstractSink.output(AbstractSink.java:67) ~[data-prepper-api-2.6.1.jar:?]
at org.opensearch.dataprepper.pipeline.Pipeline.lambda$publishToSinks$5(Pipeline.java:349) ~[data-prepper-core-2.6.1.jar:?]
... 5 more
It looks like the split_string processor is creating a list that cannot be used as the document_id_field.
Question: How can I split the list of strings returned by the split_string processor into separate events to ingest each of them separately into my OpenSearch sink?