Apache Flink JDBC WHERE and JOIN causes buffering of millions of records

31 Views Asked by At

We need to write a job that batch reads SQL Server data through for example the JDBC connector and push them to Iceberg. We already stream recent Kafka data, but for historic purposes this job is needed.

It has two paths and two main tables, Answers with 900mil rows and AnswerSets with 100mil row. For AnswerSets we make a Flink table, select all AnswerSets where SurveyId = INT and write these to Iceberg. We get a few results within seconds and things are smooth.

For Answer though, we need to make a join. Selecting all Answers where AnswerSetID matches the AnswerSet SurveyID = INT set. In our example with SQL Server console, this query takes a few seconds to complete and yields 36 sets and about 2k answers.

In Flink however, even though we specify batch mode, the job graph seems to be buffering all Answers in memory before trying to perform a join. So far our job goes OOM after buffering 300mil records. But it doesn't perform the same buffering when doing a simpler select from the AnswerSet table.

SELECT a.AnswerId, a.intAnswerSet, a.intQuestionItem, a.strAnswer, a.dtmCreated
FROM mssql_source_answers a
INNER JOIN mssql_source_answerset ans ON a.intAnswerSet = ans.AnswerSetId
WHERE ans.intSurveyId = 500;

We initialise our environment as we primarily compile to JAR:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set the execution mode to BATCH
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// Not relevant as Batch mode
// env.enableCheckpointing(Integer.parseInt(parameters.get("checkpoint", "10000")));

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

Then we create our tables:

CREATE TABLE mssql_source_answers
(
    AnswerId BIGINT,
    intAnswerSet INT,
    intQuestionItem INT,
    strAnswer STRING,
    dtmCreated TIMESTAMP(3)
)
WITH ( 
    'connector' = 'jdbc', 
    'table-name' = 'dbo.Answers'
    ...
);


CREATE TABLE mssql_source_answerset
(
    AnswerSetId INT,
    intSurveyId INT,
    dtmCreated TIMESTAMP(3),
    intRespondantId INT,
    dtmCompleted TIMESTAMP(3)
)
WITH ( 
    'connector' = 'jdbc', 
    'table-name' = 'dbo.AnswerSet'
    ...
);

And we execute a basic query to fetch AnswerSet data and write it to Iceberg. This works just fine.

 // Use the registered MS SQL table
Table filteredSourceTableAnswerSet = tableEnv.sqlQuery(
     "SELECT * FROM mssql_source_answerset WHERE intSurveyId = " + importSurveyId
);

// The should support both batch and streaming sets now
DataStream<Row> rowDataStreamAnswerSet = tableEnv.toDataStream(filteredSourceTableAnswerSet);

....

// Transform
DataStream<Row> transformedStreamAnswerSet = rowDataStreamAnswerSet
        .map(new MapFunction<Row, Row>() {
            @Override
            public Row map(Row value) throws Exception {
                ....
            }
        });

....

// Write to Iceberg
FlinkSink.forRow(transformedStreamAnswerSet, FlinkSchemaUtil.toSchema(schemaAnswerSet))
            .tableLoader(TableLoader.fromCatalog(catalogLoader, outputTableAnswerSet))
            .distributionMode(DistributionMode.HASH)
            .writeParallelism(parallelism)
            .append();

However, when running the Answer join it just seems to buffer up everything until it goes OOM. Whether being in the JAR with batch mode as above in same code format or in Flink sql-client.

....
 // Use the registered MS SQL table
Table filteredSourceTableAnswers = tableEnv.sqlQuery(
       "SELECT a.AnswerId, a.intAnswerSet, a.intQuestionItem, a.strAnswer, a.dtmCreated " +
       "FROM mssql_source_answers a " +
       "INNER JOIN mssql_source_answerset ans ON a.intAnswerSet = ans.AnswerSetId " +
       "WHERE ans.intSurveyId = " + importSurveyId
        );
....

The result always ends up looking like this example image. Here we ran it for 15 minutes and so far it buffered up 29mil records. It is not the join being slow from our understanding, but the actual Answer source before the basic WHERE clause can be merged with results from AnswerSets.

In this example, we expect only 36 AnswerSet rows and around 2000 answers.

enter image description here

SQL Client example:

CREATE TABLE mssql_source_answers
(
    AnswerId BIGINT,
    intAnswerSet INT,
    intQuestionItem INT,
    strAnswer STRING,
    dtmCreated TIMESTAMP(3)
)
WITH ( 
    'connector' = 'jdbc', 
    ...
);


CREATE TABLE mssql_source_answerset
(
    AnswerSetId INT,
    intSurveyId INT,
    dtmCreated TIMESTAMP(3),
    intRespondantId INT,
    dtmCompleted TIMESTAMP(3)
)
WITH ( 
    'connector' = 'jdbc', 
    ...
);


// instant answer
SELECT * FROM mssql_source_answers WHERE intAnswerSet = 296259;

// buffer and downloads all answers until OOM
SELECT a.AnswerId, a.intAnswerSet, a.intQuestionItem, a.strAnswer, a.dtmCreated
FROM mssql_source_answers a
INNER JOIN mssql_source_answerset ans ON a.intAnswerSet = ans.AnswerSetId
WHERE ans.intSurveyId = 500;
1

There are 1 best solutions below

0
Shaflump On

I spent some time googling and I found a similar question here, but that person wrote it before attempting to implement this and doesn't encounter an issue yet.

Questions for reading data from JDBC source in DataStream Flink

While I still don't understand why my code doesn't work well, I did solve my goal in a different way by following the solution of:

use RichParallelSourceFunction where you can do a custom query to your > database and get the datastream from it. An SQL with JDBC driver can be > fired in the extension of RichParallelSourceFunction class.

So for my Java job it ended up being a new class:


public class MSSQLAnswersSourceFunction extends RichParallelSourceFunction<Row> {

    private volatile boolean isRunning = true;
    private String jdbcUrl;
    private String username;
    private String password;
    private String surveyId;

    public MSSQLAnswersSourceFunction(String jdbcUrl, String username, String password, String surveyId) {
        this.jdbcUrl = jdbcUrl;
        this.username = username;
        this.password = password;
        this.surveyId = surveyId;
    }

    @Override
    public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
        super.open(parameters);
        // Initialize your database connection here
    }

    @Override
    public void run(SourceContext<Row> ctx) throws Exception {
        try (Connection connection = DriverManager.getConnection(jdbcUrl, username, password)) {
            String query = "SELECT a.AnswerId, a.intAnswerSet, a.intQuestionItem, a.strAnswer, a.dtmCreated " +
                           "FROM dbo.Answers a " +
                           "INNER JOIN dbo.AnswerSet ans ON a.intAnswerSet = ans.AnswerSetId " +
                           "WHERE ans.intSurveyId = ?";
            try (PreparedStatement stmt = connection.prepareStatement(query)) {
                stmt.setString(1, surveyId);
                try (ResultSet resultSet = stmt.executeQuery()) {
                    while (resultSet.next() && isRunning) {
                        Row row = new Row(5); // Adjust the size based on your columns
                        row.setField(0, resultSet.getLong("AnswerId"));
                        row.setField(1, resultSet.getInt("intAnswerSet"));
                        row.setField(2, resultSet.getInt("intQuestionItem"));
                        row.setField(3, resultSet.getString("strAnswer"));
                        row.setField(4, resultSet.getTimestamp("dtmCreated"));
                        ctx.collect(row);
                    }
                }
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

Then in the main method I could do this:

MSSQLAnswersSourceFunction sourceFunction = new MSSQLAnswersSourceFunction(jdbcUrl, username, jdbcPassword, importSurveyId);

DataStream<Row> rowDataStreamAnswers = env.addSource(sourceFunction).setParallelism(parallelism);  

And then the transformation code would look the same, except I would need to use indexes instead of column names, but that can be solved using a map.