Zeppelin with flink and s3 not able to find s3 filesystem

52 Views Asked by At
  1. I have flink 1.14.6 installed locally on my machine.
  2. In the local flink installation, I have configured flink-s3-fs-hadoop plugin
  3. Using sql-client of flink, I am able to work with flink sql and s3
  4. But when I try to use the same flink installation through apache zeppelin, I get error.

This is how I am running zeppelin.

docker run -u $(id -u) -p 8080:8080 --rm -v /home/pankajb/softwares/flink-1.14.6:/opt/flink \
-e FLINK_HOME=/opt/flink \
--name zeppelin apache/zeppelin:0.10.1

And the error that I am getting is

org.apache.flink.table.api.TableException: Fetch partitions fail.
    at org.apache.flink.table.filesystem.FileSystemTableSource.listPartitions(FileSystemTableSource.java:253)
    at org.apache.flink.table.filesystem.FileSystemTableSource.getOrFetchPartitions(FileSystemTableSource.java:291)
    at org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:108)
    at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:452)
    at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:160)
    at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:124)
    at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:85)
    at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301)
    at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:716)
    at org.apache.zeppelin.flink.FlinkBatchSqlInterpreter.callInnerSelect(FlinkBatchSqlInterpreter.java:59)
    at org.apache.zeppelin.flink.FlinkSqlInterpreter.callSelect(FlinkSqlInterpreter.java:494)
    at org.apache.zeppelin.flink.FlinkSqlInterpreter.callCommand(FlinkSqlInterpreter.java:257)
    at org.apache.zeppelin.flink.FlinkSqlInterpreter.runSqlList(FlinkSqlInterpreter.java:151)
    at org.apache.zeppelin.flink.FlinkSqlInterpreter.internalInterpret(FlinkSqlInterpreter.java:109)
    at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:55)
    at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:860)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:752)
    at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
    at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
    at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugins: flink-s3-fs-hadoop, flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
    at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
    at org.apache.flink.table.filesystem.FileSystemTableSource.listPartitions(FileSystemTableSource.java:236)
    ... 37 more
0

There are 0 best solutions below