Could not find any factories that implement 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath. exception thrown

55 Views Asked by At

When I try to run a Flink batch process while using the Table environment the table environment is not implementing but instead an exception is thrown:

TableEnvironment tenv = TableEnvironment.create(EnvironmentSettings.inBatchMode());

I am getting the following error exception:

Could not find any factories that implement 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath. and the job is getting failed.

 ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(wslIpAddress, flinkPort);

  TableEnvironment tenv = TableEnvironment.create(EnvironmentSettings.inBatchMode());

   tenv.executeSql("CREATE TABLE Flinkdata (\n" +
          "  Inde STRING,\n" +
          "  User_Id STRING,\n" +
          "  First_Name STRING,\n" +
          "  Last_Name STRING,\n" +
          "  Sex STRING,\n" +
          "  Email STRING,\n" +
          "  Phone STRING,\n" +
          "  Date_of_birth STRING, \n" +
          "  Job_Title STRING,\n" +
          "  PRIMARY KEY (Inde) NOT ENFORCED\n" +
          ") WITH (\n" +
          "   'connector.type' = 'jdbc',\n" +
          "   'connector.url' = 'jdbc:mysql://localhost/ruby',\n" +
          "   'connector.table' = 'flink_people_data',\n" +
          "   'connector.username' = 'root',\n" +
          "   'connector.password' = 'passwordd1234'\n" +
          ")");

    tenv.executeSql("SELECT * FROM Flinkdata");
    Table transactions = tenv.from("Flinkdata");


env.execute("ReadWriteToMariaDB"); 

The error seems to be occurring at this line from the code:

TableEnvironment tenv = TableEnvironment.create(EnvironmentSettings.inBatchMode()); 

How can I solve that?

1

There are 1 best solutions below

0
MatrixOrigin On

This is due to missing dependencies, you can import the following dependencies in maven:

<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-sql-client</artifactId>
        <version>${flink.version}</version>
</dependency>

The following is a full pom file:

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.44</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-csv</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-sql-client</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

And here's the demo:

public class FlinkSQL {

public static void main(String[] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("192.168.110.244", 6123);

    TableEnvironment tenv = TableEnvironment.create(EnvironmentSettings.inBatchMode());

    tenv.executeSql("CREATE TABLE test_t1 (\n" +
            "  id int,\n" +
            "  name STRING,\n" +
            "  age int,\n" +
            "  PRIMARY KEY (id) NOT ENFORCED\n" +
            ") WITH (\n" +
            "   'connector' = 'jdbc',\n" +
            "   'url' = 'jdbc:mysql://192.168.110.210:3306/test_db?useSSL=false',\n" +
            "   'table-name' = 'test_t1',\n" +
            "   'username' = 'root',\n" +
            "   'password' = 'root'\n" +
            ")");

    tenv.executeSql("SELECT * FROM test_t1").print();
    //Table transactions = tenv.from("test_t1");
    //env.execute("ReadWriteToMariaDB");

    }
}