Error message from worker: redis.clients.jedis.exceptions.JedisConnectionException: Unknown reply:

238 Views Asked by At

While connecting to memorystore through dataflow getting the below exception in the dataflow worker logs.

'''Error message from worker: redis.clients.jedis.exceptions.JedisConnectionException: Unknown reply: redis.clients.jedis.Protocol.process(Protocol.java:140) redis.clients.jedis.Protocol.read(Protocol.java:192) redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:316) redis.clients.jedis.Connection.getStatusCodeReply(Connection.java:243) redis.clients.jedis.Jedis.ping(Jedis.java:356) org.redis.memorystore.redisconn1.processElement(memorystore.java:39)'''

Please find the code which I am using.

class redisconn1 extends DoFn<String,String>
{
        private static final long serialVersionUID = 1L;
        private static final Logger LOG = LoggerFactory.getLogger(redisconn1.class);
        @ProcessElement
        public void processElement(ProcessContext c) throws ParseException, IOException, InterruptedException
        {
                String line = c.element();
                Jedis jedis = new Jedis("12.08.64.4", 6378);
                LOG.info("Connection to server sucessfully");
                LOG.info("Server is running: "+jedis.ping());

        }
}
public class redisconn
{
        public static void main(String[] args)
        {
                DataflowPipelineOptions options =PipelineOptionsFactory.as(DataflowPipelineOptions.class);
                options.setProject("heg-dev-rtstdo-0");//project_id
                options.setRunner(DataflowRunner.class);//Runner type
                options.setRegion("us-east4");//region
                options.setTemplateLocation("gs://heg-dev-rtstdo-0-devlopment_dataflow/DataflowSinkJobs/redis_template/TemplateNag");
                options.setGcpTempLocation("gs://heg-dev-rtstdo-0-devlopment_dataflow/DataflowSinkJobs/TempFolder/redis_temp");
                options.setAutoscalingAlgorithm(DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED);
                Pipeline p = Pipeline.create(options);
                
                p.apply(Create.of("Hello world")).apply(ParDo.of(new redisconn1()));

                p.run();

        }
}

2

There are 2 best solutions below

2
dre-hh On

For external connetions on dataflow it is best to use beam.sdk.io. For redis org.apache.beam.sdk.io.redis.RedisIO; For enriching data from redis in a DoFn you can use the pattern outlined in https://cloud.google.com/blog/products/data-analytics/guide-to-common-cloud-dataflow-use-case-patterns-part-1

public  class ToKVFunction extends DoFn
   private Jedis redis;

  @Override
  public void startBundle(){
    this.redis = RedisConnectionConfiguration.crete("12.08.64.4", 6378)
  }

     
  @ProcessElement
  public void processElement(ProcessContext c) throws ParseException, IOException, InterruptedException
    {
        String line = c.element();
        // this.redis.hget(...)

    }
 
    @Override
    public void finishBundle(){
     this.redis.close()
    }
}

0
Nagesh B Viswanadham On

hh

I have tried with beam.sdk.io to connect to the memory store. But I am still getting the same exception.

Please find the code which I used.

public class memorystore 
{

    public static void main(String[] args) 
    {
        DataflowPipelineOptions options =PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options.setProject("vz-it-np-gh2v-dev-rtstdo-0");//project_id
        options.setRunner(DataflowRunner.class);//Runner type
        options.setRegion("us-east4");//region
        options.setTemplateLocation("gs://vz-it-np-gh2v-dev-rtstdo-0-devlopment_dataflow/DataflowSinkJobs/redis_template/TemplateNag");
        options.setGcpTempLocation("gs://vz-it-np-gh2v-dev-rtstdo-0-devlopment_dataflow/DataflowSinkJobs/TempFolder/redis_temp");
        options.setAutoscalingAlgorithm(DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED);
        Pipeline pipeline = Pipeline.create(options);

        
        RedisConnectionConfiguration redisConfig = RedisConnectionConfiguration.create()
                                                   .withHost("100.68.64.4")
                                                   .withPort(6378)
                                                   .withAuth("e7cce2ef-7fb1-47ff-bc8f-8bfb5d936b0c");

        PCollection<KV<String, String>> output1=pipeline.apply(Create.of(KV.of("a", "1"), KV.of("b", "2"), KV.of("a", "3"), KV.of("c", "4")));
        
        
        output1.apply(ParDo.of(new ToKVFunction()))
                .apply(RedisIO.write().withMethod(Write.Method.SADD).withConnectionConfiguration(redisConfig));

        pipeline.run().waitUntilFinish();
    }

    // Function to convert your data to KV format (Key-Value)
    public static class ToKVFunction extends DoFn<KV<String, String>, KV<String, String>> 
    {
        @ProcessElement
        public void processElement(ProcessContext c) {
            KV<String, String> input = c.element();
            String key = "GCP";
            String value = "1";
            c.output(KV.of(key, value));
        }
    }
}

Please find the exception which I am getting.

Error message from worker: java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: redis.clients.jedis.exceptions.JedisConnectionException: Unknown reply: org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:178) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:149) org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:67) org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:54) org.apache.beam.runners.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:91) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:109) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:267) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:221) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:147) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:127) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:114) java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.beam.sdk.util.UserCodeException: redis.clients.jedis.exceptions.JedisConnectionException: Unknown reply: org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) org.apache.beam.sdk.io.redis.RedisIO$Write$WriteFn$DoFnInvoker.invokeSetup(Unknown Source) org.apache.beam.sdk.transforms.reflect.DoFnInvokers.tryInvokeSetupFor(DoFnInvokers.java:53) org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:86) org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:68) org.apache.beam.runners.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:100) org.apache.beam.runners.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:75) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:248) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:80) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:167) ... 15 more Suppressed: org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) at org.apache.beam.sdk.io.redis.RedisIO$Write$WriteFn$DoFnInvoker.invokeTeardown(Unknown Source) at org.apache.beam.sdk.transforms.reflect.DoFnInvokers.tryInvokeSetupFor(DoFnInvokers.java:56) ... 22 more Caused by: java.lang.NullPointerException at org.apache.beam.sdk.io.redis.RedisIO$Write$WriteFn.teardown(RedisIO.java:811) Caused by: redis.clients.jedis.exceptions.JedisConnectionException: Unknown reply: redis.clients.jedis.Protocol.process(Protocol.java:140) redis.clients.jedis.Protocol.read(Protocol.java:192) redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:316) redis.clients.jedis.Connection.getStatusCodeReply(Connection.java:243) redis.clients.jedis.Jedis.auth(Jedis.java:2625) org.apache.beam.sdk.io.redis.RedisConnectionConfiguration.connect(RedisConnectionConfiguration.java:143) org.apache.beam.sdk.io.redis.RedisIO$Write$WriteFn.setup(RedisIO.java:684)

Please let me know how can we resolve the same.