I have 2 tables in postgres db that I would need to join and send the resulting output to kafka. To do that I wrote a java code that uses spark framework. The sample of my code is like so:
Main class:
private static final KafkaService kafkaService = new KafkaService();
public static void main(String[] args) {
start();
}
private static void start() {
String dbUrl = "jdbc:postgresql://localhost:5432/postgres?rewriteBatchedStatements=true";
String query = "(WITH " +
"all_places AS (" +
"SELECT " +
"pt.place_id, " +
"pt.name, " +
"FROM place_table pt WHERE pt.loc = 'all'), " +
"t_id AS (" +
"SELECT town_table.id, town_table.town_id, town_table.place_id from town_table " +
"WHERE town_table.place_id IN (SELECT all_places.place_id FROM all_places)) " +
"SELECT all_towns.id, all_towns.town_id, " +
"(SELECT JSONB_AGG(related_places) FROM " +
"(SELECT * FROM all_places " +
"LEFT JOIN t_id ON t_id.place_id=all_places.place_id " +
"WHERE t_id.town_id = all_towns.town_id " +
") as related_places) AS related_places " +
"FROM t_id all_towns) as db_data";
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SQL");
SparkSession sparkSession = SparkSession.builder()
.config(conf)
.getOrCreate();
Dataset<Row> loadData = sparkSession.read()
.option("driver","org.postgresql.Driver")
.option("partitionColumn", "id")
.option("numPartitions", 9)
.option("lowerBound", 1)
.option("upperBound", 100)
.option("url",dbUrl)
.option("user","postgres")
.option("password","admin")
.option("dbtable",query)
.format("jdbc")
.load();
loadData.persist(StorageLevel.MEMORY_ONLY());
loadData.foreachPartition(f -> {
while(f.hasNext()) {
Row row = f.next();
DummySchema event = new DummySchema();
event.setEntityId(row.get(1).toString());
event.setData(row.get(2) == null ? "" : row.get(2).toString());
kafkaService.sendMessage(event,"dummyTopic");
}
});
loadData.unpersist();
}
KafkaService class:
private KafkaProducer<String, DummySchema> producer;
public KafkaService(KafkaProducer producer) {
this.producer = producer;
}
public KafkaService(){}
public void sendMessage(DummySchema event,String topic) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
props.put("auto.register.schemas", false);
props.put("use.latest.version", true);
producer = new KafkaProducer(props);
ProducerRecord<String,DummySchema> producerRecord = new ProducerRecord<>(topic,event.getEntityId().toString(),event);
producer.send(producerRecord);
}
And DummySchema object is generated from avro schema file. So standard kafka set up.
Now, from what I understand the partitioning takes place once the result set of the query is obtained (i.e. query gets executed and then partitioned). This is not ideal, since both tables have a lot of data in them, which could take this query a long time to get executed. The better way would be to partition town_table first and then do the join with place_table. In other words, I don't want to partition a result set, I want to partition the town_table and then have executors do the join.
So instead of code doing this (which is what currently is happening):
Execute the query -> obtain the result set -> load the result set to the loadData
variable -> partition the result -> send each row of the partitioned data to kafka in executors
I want my code to do this:
Partition town_table -> trigger the query ->
have each executor figure out the result set for the partitioned town_table data -> have each executor
send its result set's rows to kafka.
So instead of master figuring out the result set of the query for all town_table rows, the executors will work on subset of rows.
To achieve that, I could get 2 datasets instead - 1 for data from place_table as a broadcast/persisted dataset, and 1 for data from town_table which will be partitioned & then each partition will do dataset join with place_table dataset, before sending the info to kafka. My only concern with this approach is that all data will be loaded into memory (which given that there are ~ 10 billions records between 2 tables it could lead to problems).
So my question is as follows: is it possible to do partitioning not on a result set of the query but on one of the tables that is being joined (so that joining is done in executors as explained above)? Or is what I described in paragraph above is the only way?