Dynamic Flowfile Transfer Scheduling Using Quartz in NiFi Processors

36 Views Asked by At

I have been assigned a requirement in my project i.e. is to implement a Quartz scheduler with in a custom NiFi processor.

At the initial stage of work, the functionality I need to develop is that to schedule the transfer of the flowfile from its preceding processor to its defined processor at a scheduled time.

I will the receive the scheduling instructions as a dynamic parameter and need to schedule it accordingly using quartz library.

So at the base level of development I am hardcoding the scheduling expressions (CRON expression).

-> What I have tried

  • I have created a custom processor and have added the quartz library.

  • Then created a class (ScheduleFlowFile) to implement the org.quartz.Job and have implemented the abstract method execute which is used to execute the logic at the scheduled time. Within this class I am getting the reference to the ProcessSession, ProcessContext variables of the custom NiFi processor that I have created using JobDataMap while defining a JobDetail which is to be scheduled.

package com.chellyvishal.scheduleQuartz;

import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

public class ScheduleFlowFile implements Job {
    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {

        JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap();

        ProcessSession session = (ProcessSession) dataMap.get("processSession");
        ProcessContext context = (ProcessContext) dataMap.get("processContext");
        Relationship success   = (Relationship)   dataMap.get("successRelationship");

        FlowFile getFlowFile   = session.get();

        session.transfer(getFlowFile, success);
    }
}

  • Inside MyProcessors.class with in the onTrigger method I have created the JobDetail and the trigger to run them at the instructed schedule
public class MyProcessor extends AbstractProcessor {

    public static final Relationship SUCCESS = new Relationship.Builder()
            .name("success")
            .description("FlowFiles that have been successfully processed")
            .build();

    public static final Relationship FAILURE = new Relationship.Builder()
            .name("failure")
            .description("FlowFiles that have failed processing")
            .build();
    .
    .
    .
    .
    .

    .
    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) {
        FlowFile flowFile = session.get();
        if ( flowFile == null ) {
            return;
        }
        // TODO implement

        try {
            scheduler = schedulerFactory.getScheduler();
        } catch (SchedulerException e) {
            throw new RuntimeException(e);
        }

        long currentTime         = System.currentTimeMillis();
        String uniqueJobIdentifier = "sampleJob_" + currentTime;
        String cronExpression = "0/10 0/2 * ? * * *";

        JobDetail jobDetail = newJob(ScheduleFlowFile.class)
                .withIdentity(uniqueJobIdentifier, "newGroupFirst")
                .build();

        jobDetail.getJobDataMap().put("processSession", session);
        jobDetail.getJobDataMap().put("processContext", context);
        jobDetail.getJobDataMap().put("successRelationship", SUCCESS);

        //A simple trigger which runs once after 5 seconds from the moment of getting initiated
        Trigger trigger = newTrigger()
                .withIdentity("newTrigger"+currentTime, "newGroupFirst")
                .startNow()
                .withSchedule(simpleSchedule()
                        .withIntervalInSeconds(5)
                        .withRepeatCount(0))
                .build();
        //A CRON trigger which schedules the job according the CRON expression
        Trigger cronTrigger = TriggerBuilder.newTrigger()
                .withIdentity("newCRONTrigger"+currentTime", "newGroupFirst")
                .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
                .build();

        try {
            scheduler.scheduleJob(jobDetail, trigger);
        } catch (SchedulerException e) {
            throw new RuntimeException(e);
        }


}

But this is the error I am facing

And I am stuck here and not getting an idea how to deal with it

0

There are 0 best solutions below