Apache Flink on python UDFs are not working. Despite following AWS repo on pyflink UDFs

287 Views Asked by At

I see AWS has a Kinesis-DataAnalytics stream with Apache Flink. And on the pyflink library for python there is a way to create UDF. In fact, AWS seems to have a repo with examples for pyflink UDF.

I have tried following the guide, and doing the UDF but it's not working, it gives some java errors when I use the UDF and it works fine if I stop using the UDF. My UDF is very simple since I am trying to run sample code, it just converts a string to lowercase.

This is the AWS flink Kinesis Analytics github sample repo about using UDF in a stream. And this is the example I am trying to reproduce https://github.com/aws-samples/pyflink-getting-started/tree/main/pyflink-examples/UDF But It's not running.

What I have done so far is doing two scripts, one with the UDF and another without the UDF:

  1. flink_simple_functional.py This one does not uses the UDF and just does a very simple transformation like multiplying an integer column by 2 and sinking it to the console. This works fine
  2. flink_simple.py This other one is a copy of the above mentioned code but it uses the UDF, but besides doing a multiplication by 2, it also derives another column more by using a UDF. That UDF just converts a string column to lowercase

However, the first code works (the one that does not uses the UDF), and the second does not, despite it's a very simple UDF.

Can someone spot what might be the mistake? Or if UDF are not supported anymore for AWS Kinesis?

Here I attach both of my flink files, as well as a third file called stock.py which is the one I use to populate my Kinesis input_stream from which both the flink files fetch as source table. Just in case it might help to take a look at how that stream looks like. It only has one shard and I confirm it has data.

At the very end I also attach the errors that the UDF file flink_simple.py gives, its some java errors.

*Note, my UDF is simple for now for example purposes, but I definitely need the UDF for real code.

*Note. I am running this on a windows 11 machine on the desktop (not inside a docker container nor anything)

stock.py file

(this generates the kinesis stream from which the flink scripts pull as source)


import datetime
import json
import random
import boto3
import time

STREAM_NAME = "input_stream"


def get_data():
    return {
        'event_time': datetime.datetime.now().isoformat(),
        'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
        'price': round(random.random() * 100, 2)    }


def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey="partitionkey")
        time.sleep(0.1)


if __name__ == '__main__':
    generate(STREAM_NAME, boto3.client('kinesis',
                                       aws_access_key_id='<some_key>',
                                       aws_secret_access_key='<some_key>'))

flink_simple.py file

import boto3
import datetime
import time
import json
import os


from pyflink.table import EnvironmentSettings, TableEnvironment,DataTypes
from pyflink.table.udf import udf

#Create env 
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
statement_set = table_env.create_statement_set()





#start of block: this is to point towards a .jar file that AWS Kinesis needs /lib/flink-sql-connector-kinesis-1.15.2.jar
CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
table_env.get_config().get_configuration().set_string(
    "pipeline.jars",
    "file:///" + CURRENT_DIR + "/lib/flink-sql-connector-kinesis-1.15.2.jar",
)
# end of block 


# function that simply returns a string which would be the query string to create the source.
# my source is a kinesis stream called input_stream, it's been tested before and it works fine
def create_source_table(table_name):
    return """ CREATE TABLE {0} (
                ticker STRING,
                price DOUBLE,
                event_time TIMESTAMP(3),
                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
              )
              PARTITIONED BY (ticker)
              WITH (
                'connector' = 'kinesis',
                'stream' = 'input_stream',
                'aws.region' = 'us-west-2',
                'scan.stream.initpos' = 'LATEST',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """.format(
        table_name
    )

# this is to create the "sink" definition which will simply be my console
def create_print_table(table_name):
    query='''CREATE TABLE {0} (
    ticker STRING,
    price DOUBLE,
    event_time TIMESTAMP(3),
    duped_price DOUBLE,
    lowered_ticker STRING,
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    )
    WITH
    (
    'connector' = 'print'
    )
    '''.format(table_name)
    return query


#this block is to create a UDF and register it to later use it in transformations
@udf(input_types=[DataTypes.STRING()],result_type=DataTypes.STRING())
def extract_decimals(product):
    return product.lower()

table_env.create_temporary_system_function("lower_eugenio",extract_decimals)
# end of block: Notice how the line above runs already to register my UDF into the table_env


#here the app starts after the table_env got created and some helper function got created above
def main():

    #Here I create my source as per the SQL string returned by the `create_source_table` function
    table_env.execute_sql(create_source_table('price'))

    #Here I just make my SQL string for the transformation I want to make
    transformation='''
    SELECT ticker,price,event_time,price*2 as duped_price,lower_eugenio(ticker) as lowered_ticker
    FROM price
    '''

    #Here I create a temporary view to dump the result of the `transforamtion` query
    table_env.execute_sql('''CREATE TEMPORARY VIEW joined_streams AS {0}'''.format(transformation))

    #Here I create my sink as per the query string returned from the `create_print_table` function 
    table_env.execute_sql(create_print_table('product_table'))

    #Finally here, I dump the temporary table data into the sink table. This is the real sink operation
    result_table=table_env.execute_sql('INSERT INTO product_table SELECT * FROM joined_streams')

    #This is to leave the job running. I am running this local on windows 11 on the desktop
    result_table.wait()


#initialize the app
if __name__ == "__main__":
    main()

flink_simple_functional.py file

# THIS CODE IS EXACTLY EQUAL TO THE OTHER, EXCEPT ON THE `transformation` sql string variable I omitted using the UDF, and on the
# `create_print_table` I deleted the column "lowered_ticker" since I wont use the UDF that would have outputed that column.
# This code works well, and all it does is to not run the UDF

import boto3
import datetime
import time
import json
import os


from pyflink.table import EnvironmentSettings, TableEnvironment,DataTypes
from pyflink.table.udf import udf

#Create env 
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
statement_set = table_env.create_statement_set()





#start of block: this is to point towards a .jar file that AWS Kinesis needs /lib/flink-sql-connector-kinesis-1.15.2.jar
CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
table_env.get_config().get_configuration().set_string(
    "pipeline.jars",
    "file:///" + CURRENT_DIR + "/lib/flink-sql-connector-kinesis-1.15.2.jar",
)
# end of block 


# function that simply returns a string which would be the query string to create the source.
# my source is a kinesis stream called input_stream, it's been tested before and it works fine
def create_source_table(table_name):
    return """ CREATE TABLE {0} (
                ticker STRING,
                price DOUBLE,
                event_time TIMESTAMP(3),
                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
              )
              PARTITIONED BY (ticker)
              WITH (
                'connector' = 'kinesis',
                'stream' = 'input_stream',
                'aws.region' = 'us-west-2',
                'scan.stream.initpos' = 'LATEST',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """.format(
        table_name
    )

# this is to create the "sink" definition which will simply be my console
def create_print_table(table_name):
    query='''CREATE TABLE {0} (
    ticker STRING,
    price DOUBLE,
    event_time TIMESTAMP(3),
    duped_price DOUBLE,
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    )
    WITH
    (
    'connector' = 'print'
    )
    '''.format(table_name)
    return query


#this block is to create a UDF and register it to later use it in transformations
@udf(input_types=[DataTypes.STRING()],result_type=DataTypes.STRING())
def extract_decimals(product):
    return product.lower()

table_env.create_temporary_system_function("lower_eugenio",extract_decimals)
# end of block: Notice how the line above runs already to register my UDF into the table_env


#here the app starts after the table_env got created and some helper function got created above
def main():

    #Here I create my source as per the SQL string returned by the `create_source_table` function
    table_env.execute_sql(create_source_table('price'))

    #Here I just make my SQL string for the transformation I want to make
    transformation='''
    SELECT ticker,price,event_time,price*2 as duped_price
    FROM price
    '''

    #Here I create a temporary view to dump the result of the `transforamtion` query
    table_env.execute_sql('''CREATE TEMPORARY VIEW joined_streams AS {0}'''.format(transformation))

    #Here I create my sink as per the query string returned from the `create_print_table` function 
    table_env.execute_sql(create_print_table('product_table'))

    #Finally here, I dump the temporary table data into the sink table. This is the real sink operation
    result_table=table_env.execute_sql('INSERT INTO product_table SELECT * FROM joined_streams')

    #This is to leave the job running. I am running this local on windows 11 on the desktop
    result_table.wait()


#initialize the app
if __name__ == "__main__":
    main()

errors from flink_simple.py

py4j.protocol.Py4JJavaError: An error occurred while calling o125.await.
: java.util.concurrent.ExecutionException: org.apache.flink.table.api.TableException: Failed to wait job finish
        at java.util.concurrent.CompletableFuture.reportGet(Unknown Source)
        at java.util.concurrent.CompletableFuture.get(Unknown Source)
        at org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:118)
        at org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:81)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.table.api.TableException: Failed to wait job finish
        at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:85)
        at org.apache.flink.table.api.internal.InsertResultProvider.isFirstRowReady(InsertResultProvider.java:71)
        at org.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:105)
        at java.util.concurrent.CompletableFuture$AsyncRun.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        ... 1 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at java.util.concurrent.CompletableFuture.reportGet(Unknown Source)
        at java.util.concurrent.CompletableFuture.get(Unknown Source)
        at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83)
        ... 6 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
        at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
        at java.util.concurrent.CompletableFuture.uniApply(Unknown Source)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source)
        at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
        at java.util.concurrent.CompletableFuture.complete(Unknown Source)
        at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:267)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
        at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
        at java.util.concurrent.CompletableFuture.complete(Unknown Source)
        at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1300)
        at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
        at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
        at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
        at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
        at java.util.concurrent.CompletableFuture.complete(Unknown Source)
        at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
        at akka.dispatch.OnComplete.internal(Future.scala:300)
        at akka.dispatch.OnComplete.internal(Future.scala:297)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
        at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
        at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
        at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
        at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
        at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
        at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:622)
        at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
        at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
        at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
        at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
        at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
        at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
        at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
        at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(Unknown Source)
        at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
        at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)    
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:249)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:242)
        at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:748)
        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:725)
        at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80)
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:479)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
        at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
        at akka.actor.Actor.aroundReceive(Actor.scala:537)
        at akka.actor.Actor.aroundReceive$(Actor.scala:535)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
        at akka.actor.ActorCell.invoke(ActorCell.scala:547)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
        at akka.dispatch.Mailbox.run(Mailbox.scala:231)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
        ... 4 more
Caused by: java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing Python harness: C:\Users\eugen\anaconda3\lib\site-packages\pyflink\fn_execution\beam\beam_boot.py --id=11-1 --provision_endpoint=localhost:54288
INFO:root:Starting up Python harness in loopback mode.

        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:656)
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:281)
        at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:57)
        at org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:92) 
        at org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:101)
        at org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperator.open(PythonScalarFunctionOperator.java:71)    
        at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)      
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
        at java.lang.Thread.run(Unknown Source)
        Suppressed: java.lang.NullPointerException
                at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421)
                at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
                at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:115)
                at org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:124)
                at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)
                at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1043)
                at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255)
                at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)
                at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:951)
                at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:934)
                at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
                at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:934)
                ... 3 more
Caused by: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 0
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)    
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:654)
        ... 15 more
Caused by: java.lang.IllegalStateException: Process died with exit code 0
        at org.apache.beam.runners.fnexecution.environment.ProcessManager$RunningProcess.isAliveOrThrow(ProcessManager.java:75)
        at org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory.createEnvironment(ProcessEnvironmentFactory.java:110)   
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)  
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
        ... 23 more
0

There are 0 best solutions below