When use foreachPartition to write data in rdd into mysql , i lost mysql connection occasionally

776 Views Asked by At

i use spark rdd to write data into mysql, the operator i use is foreachPartition, in the operator i set up connection pool and write data(using scalike jdbc's), then destory the pool, howerver it seems the connection pool cannot be found occasionally, the log said Connection pool is not yet initialized. (name:'xxx), i've no idea why it happend

the data has been insert completely finally.But the exception comfused me

1

There are 1 best solutions below

3
Hari On

I believe you have implemeted in the same way (if java used)

dstream.foreachRDD(rdd -> {

  rdd.foreachPartition(partitionOfRecords -> {

    Connection connection = createNewConnection();
    while (partitionOfRecords.hasNext()) {
      connection.send(partitionOfRecords.next());
    }
    connection.close();
  });
});

here insead of createNewConnection() method you just implement the singleton connection object pattern and leave with out closing.

dstream.foreachRDD(rdd -> {

  rdd.foreachPartition(partitionOfRecords -> {

    Connection connection = ConnectionObject.singleTonConnection();
    while (partitionOfRecords.hasNext()) {
      connection.send(partitionOfRecords.next());
    }

  });
});

//single ton method should be like this

public class ConnectionObject (){


private static Connection=null;

public static Connection singleTonConnection(){

if(Connection !=null){

/** get new connection from spring data source or jdbc client**/

}
return Connection;

}
}