Flink job not printing expected output from Kafka Source

148 Views Asked by At

I am working on a Flink job that reads data from a Kafka source, processes it using CEP patterns, and prints alerts to the console. However, when I run the Flink job using the following command:

./bin/flink run /Users/spartacus/icu-alarm/target/flink-kafka-stroke-risk-1.0-SNAPSHOT-jar-with-dependencies.jar > out.txt

I only see the following output in the out.txt file:

Job has been submitted with JobID 3811369c6f7f14d0eca0a66072550414

The expected behavior is that the Flink job should print alerts to the console like

Stroke Risk Alert: Patient ID - XYZ, Risk Level - 5

However, it seems that the alerts are not being printed.

Code:

package hes.cs63.CEPMonitor;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class StrokeRiskAlarm {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "test-consumer-group");

        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("localhost:9092")
                .setGroupId("test-consumer-group")
                .setTopics(Arrays.asList("geCEP"))
                .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
                //.setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStreamSource<String> patientData = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        // Use KafkaSource
        Pattern<String, ?> highRiskPattern = Pattern.<String>begin("first")
                .where(new SimpleCondition<String>() {
                    @Override
                    public boolean filter(String value) {
                        return getValue(value) > 3;
                    }
                });

        PatternStream<String> patternStream = CEP.pattern(
                patientData,
                highRiskPattern
        );

        DataStream<String> strokeRiskAlerts = patternStream.select(new PatternSelectFunction<String, String>() {
            @Override
            public String select(Map<String, List<String>> pattern) throws Exception {
                String userId = pattern.get("first").get(0).split(",")[0].trim();
                int risk = getTotalRisk(pattern.get("first").get(0), pattern.get("middle").get(0), pattern.get("last").get(0));
                return "Stroke Risk Alert: Patient ID - " + userId + ", Risk Level - " + risk;
            }
        });

        strokeRiskAlerts.print();

        env.execute("Stroke Risk Alert Job");
    }

    private static int getValue(String value) {
        // Parse the input value and extract the relevant data for risk calculation
        String[] parts = value.split(",");
        String type = parts[2].trim();
        double measurementValue = Double.parseDouble(parts[3].trim());

        // Implement your logic to check high stroke risk for different measurements
        if (type.equals("HR")) {
            // HeartMeasurement risk calculation logic
            int risk = 0;
            risk += measurementValue <= 50 ? 1 : 0;
            risk += measurementValue <= 40 ? 1 : 0;
            risk += measurementValue >= 91 ? 1 : 0;
            risk += measurementValue >= 110 ? 1 : 0;
            risk += measurementValue >= 131 ? 1 : 0;
            return risk;
        } else if (type.equals("SBP")) {
            // BloodPressureMeasurement risk calculation logic
            int risk = 0;
            risk += measurementValue <= 110 ? 1 : 0;
            risk += measurementValue <= 100 ? 1 : 0;
            risk += measurementValue <= 90 ? 1 : 0;
            risk += measurementValue >= 220 ? 3 : 0;
            return risk;
        } else if (type.equals("TEMP")) {
            // TempMeasurement risk calculation logic
            int risk = 0;
            risk += measurementValue <= 36 ? 1 : 0;
            risk += measurementValue <= 35 ? 2 : 0;
            risk += measurementValue >= 38.1 ? 1 : 0;
            risk += measurementValue >= 39.1 ? 1 : 0;
            return risk;
        }
        // Default risk calculation
        return 0;
    }

    private static int getTotalRisk(String firstValue, String middleValue, String lastValue) {
        // Calculate the total risk based on the values of the first, middle, and last measurements
        int firstRisk = getValue(firstValue);
        int middleRisk = getValue(middleValue);
        int lastRisk = getValue(lastValue);
        return firstRisk + middleRisk + lastRisk;
    }
}

Project code: https://github.com/IshaanAdarsh/icu-alarm/tree/main

I have verified my code, and I believe the logic for generating alerts is correct. Additionally, I have set up the Kafka source properly.

What could be the reason behind this issue, and how can I troubleshoot it to ensure that the expected alerts are printed to the console?

2

There are 2 best solutions below

0
Flaviu Cicio On

The output is fully redirected to the file. You can use the tee command to copy the output to the file while printing to the console.

./bin/flink run /Users/spartacus/icu-alarm/target/flink-kafka-stroke-risk-1.0-SNAPSHOT-jar-with-dependencies.jar >&1 | tee out.txt
0
Hasintha Samith Randika On

The output will not print to your console as the job is running inside the flink application. open the flink UI and check on the job you run. Then check the STD out tab for the required information. But what is the use case for printing the output to std out? enter image description here