I have 2 tables in postgres db that I would need to join and send the resulting output to kafka. To do that I wrote a java code that uses spark framework. The sample of my code is like so:

Main class:

private static final KafkaService kafkaService = new KafkaService();
    public static void main(String[] args) {
        start();
    }
    private static void start() {
        String dbUrl = "jdbc:postgresql://localhost:5432/postgres?rewriteBatchedStatements=true";
        String query = "(WITH " +

                "all_places AS (" +
                "SELECT " +
                "pt.place_id, " +
                "pt.name, " +
                "FROM place_table pt WHERE pt.loc = 'all'), " +

                "t_id AS (" +
                "SELECT town_table.id, town_table.town_id, town_table.place_id from town_table " +
                "WHERE town_table.place_id IN (SELECT all_places.place_id FROM all_places)) " +

                "SELECT all_towns.id, all_towns.town_id, " +
                "(SELECT JSONB_AGG(related_places) FROM " +
                "(SELECT * FROM all_places " +
                "LEFT JOIN t_id ON t_id.place_id=all_places.place_id " +
                "WHERE t_id.town_id = all_towns.town_id " +
                ") as related_places) AS related_places " +
                "FROM t_id all_towns) as db_data";

        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SQL");
        SparkSession sparkSession = SparkSession.builder()
                .config(conf)
                .getOrCreate();
        Dataset<Row> loadData = sparkSession.read()
                .option("driver","org.postgresql.Driver")
                .option("partitionColumn", "id")
                .option("numPartitions", 9)
                .option("lowerBound", 1)
                .option("upperBound", 100)
                .option("url",dbUrl)
                .option("user","postgres")
                .option("password","admin")
                .option("dbtable",query)
                .format("jdbc")
                .load();
        loadData.persist(StorageLevel.MEMORY_ONLY());

        loadData.foreachPartition(f -> {
            while(f.hasNext()) {
                Row row = f.next();
                DummySchema event = new DummySchema();
                event.setEntityId(row.get(1).toString());
                event.setData(row.get(2) == null ? "" : row.get(2).toString());

                kafkaService.sendMessage(event,"dummyTopic");
            }
        });
        loadData.unpersist();
    }

KafkaService class:

    private KafkaProducer<String, DummySchema> producer;

    public KafkaService(KafkaProducer producer) {
        this.producer = producer;
    }
    public KafkaService(){}

    public void sendMessage(DummySchema event,String topic) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        props.put("schema.registry.url", "http://localhost:8081");
        props.put("auto.register.schemas", false);
        props.put("use.latest.version", true);

        producer = new KafkaProducer(props);

        ProducerRecord<String,DummySchema> producerRecord = new ProducerRecord<>(topic,event.getEntityId().toString(),event);
        producer.send(producerRecord);
    }

And DummySchema object is generated from avro schema file. So standard kafka set up.

Now, from what I understand the partitioning takes place once the result set of the query is obtained (i.e. query gets executed and then partitioned). This is not ideal, since both tables have a lot of data in them, which could take this query a long time to get executed. The better way would be to partition town_table first and then do the join with place_table. In other words, I don't want to partition a result set, I want to partition the town_table and then have executors do the join.

So instead of code doing this (which is what currently is happening):

Execute the query -> obtain the result set -> load the result set to the loadData 
variable -> partition the result -> send each row of the partitioned data to kafka in executors

I want my code to do this:

Partition town_table -> trigger the query -> 
have each executor figure out the result set for the partitioned town_table data -> have each executor 
send its result set's rows to kafka. 

So instead of master figuring out the result set of the query for all town_table rows, the executors will work on subset of rows.

To achieve that, I could get 2 datasets instead - 1 for data from place_table as a broadcast/persisted dataset, and 1 for data from town_table which will be partitioned & then each partition will do dataset join with place_table dataset, before sending the info to kafka. My only concern with this approach is that all data will be loaded into memory (which given that there are ~ 10 billions records between 2 tables it could lead to problems).

So my question is as follows: is it possible to do partitioning not on a result set of the query but on one of the tables that is being joined (so that joining is done in executors as explained above)? Or is what I described in paragraph above is the only way?

0

There are 0 best solutions below