We have to migrate data from an old DB to another one. We use a stored procedure to retrieve the data from the DB. The data is a list of items where each item has its listId. For each list to be migrated, some checks need to be there. The chunk-oriented processing model is not really applicable here since items belonging to the same list can be in different chunks. On the other hand referencing this similar question, 1 chunck size is not really good for performance I understood and the data size is quite big (~2M lists) to be loaded in another temporary table. Any idea what's the best way to design this?
I tried to have a job with one step and chunk size of 1 that reads the records (through StoredProcedureItemReader), processes them and writes them to the new DB but the intented scenario is to have a conditional flow with a JobExecutionDecider that decides wether we are reading a new list item or just another list item to be added to the list to be migrated.
Update:
I read about AggregateItemReader but can't really find a similar example for a db item reader in order to group my read items into a collection to be processed.
class AggregateItemReader<T> : ItemReader<List<T>> {
private var itemReader: StoredProcedureItemReader<AggregateItem<T>>? = null
@Nullable
@Throws(Exception::class)
override fun read(): List<T>? {
val holder = ResultHolder<T>()
log.info { "read this element $holder" }
while (process(itemReader!!.read(), holder)) {
continue
}
return if (!holder.isExhausted) {
holder.getRecords()
} else {
null
}
}}