I'm currently trying to connect to a SQL Server from a Dataflow job using the JDBCIO step of a pipeline. I am getting the following error:
2022-12-21T15:06:16.7965222Z SEVERE: 2022-12-21T15:04:31.071Z: java.lang.AbstractMethodError
2022-12-21T15:06:16.7966124Z at net.sourceforge.jtds.jdbc.JtdsConnection.isValid(JtdsConnection.java:2833)
2022-12-21T15:06:16.7967191Z at org.apache.commons.dbcp2.DelegatingConnection.isValid(DelegatingConnection.java:895)
2022-12-21T15:06:16.7968254Z at org.apache.commons.dbcp2.PoolableConnection.validate(PoolableConnection.java:273)
2022-12-21T15:06:16.7969451Z at org.apache.commons.dbcp2.PoolableConnectionFactory.validateConnection(PoolableConnectionFactory.java:644)
2022-12-21T15:06:16.7970625Z at org.apache.commons.dbcp2.BasicDataSource.validateConnectionFactory(BasicDataSource.java:106)
2022-12-21T15:06:16.7971833Z at org.apache.commons.dbcp2.BasicDataSource.createPoolableConnectionFactory(BasicDataSource.java:652)
2022-12-21T15:06:16.7973522Z at org.apache.commons.dbcp2.BasicDataSource.createDataSource(BasicDataSource.java:534)
2022-12-21T15:06:16.7974830Z at org.apache.commons.dbcp2.BasicDataSource.getConnection(BasicDataSource.java:734)
2022-12-21T15:06:16.7976091Z at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.processElement(JdbcIO.java:1354)
I have found online that I need to be able to set a validation query on the data source, but I don't know how to do that from within the dataflow job. Below is the code I'm using to set up the connection - can I add a validation query? Is there another workaround?
PCollection<Row> Coll = pipeline
.apply("Connect", JdbcIO.<TableRow>read()
.withDataSourceConfiguration(
buildDataSourceConfig(options, URL))
.withQuery(query)
.withRowMapper(new JdbcIO.RowMapper<TableRow>() {
// Convert ResultSet to PCollection
public TableRow mapRow(ResultSet rs) throws Exception {
String ipAddress = rs.getString("IP");
return trOf(ipAddress);
}
}))
private static DataSourceConfiguration buildDataSourceConfig(Options options, String url)
throws Exception {
return DataSourceConfiguration
.create("net.sourceforge.jtds.jdbc.Driver", url)
.withUsername(user)
.withPassword(pass);
}
You could create DataSource separately and pass it to DataSourceConfiguration constructor (https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/io/jdbc/JdbcIO.DataSourceConfiguration.html)
When creating DataSource, you should be able to set validation query:
https://commons.apache.org/proper/commons-dbcp/apidocs/org/apache/commons/dbcp2/BasicDataSource.html#setValidationQuery-java.lang.String-