The list of available Apache Nifi processors is extensive, as documented in this post. There is still a need to develop your own; to pull data from a database, to process an uncommon file format, or many other unique situations. So to get you started, we will work through a basic processor that takes a json file as input and a json path as a parameter to place into the contents and an attribute. The full source is hosted on Github.
Setup
Start by creating a simple maven project in your favorite IDE. Then edit the pom.xml.
This pom.xml includes a single plug-in for building a nifi nar, which is similar to a war for nifi, that bundles everything up in a way nifi can unpack. The nifi-api is the only other “required” dependency. The other nifi dependencies are really use full as you will see.
The next important piece is telling nifi which classes to load and register. This is done in a single file located at /src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
The JSON Processor
Now that everything is defined and findable by Apache Nifi, lets build a processor. Define a simple java class as defined in the setup process (rocks.nifi.examples.processors.JsonProcessor).
Tags are useful for finding your processor in the list of processors in the GUI. So in this case in the search box you could just type ‘json’ and your processor will be found. The capability description is also displayed in the processor selection box. Nifi.rocks will make future posts on documenting your custom processors. Finally most processors will just extend the AbstractProcessor, for more complicated tasks it may be required to go a level deeper for the AbstractSessionFactoryProcessor.
@SideEffectFree@Tags({"JSON","NIFI ROCKS"})@CapabilityDescription("Fetch value from json path.")publicclassJsonProcessorextendsAbstractProcessor{
Not really interesting stuff here. Properties will hold all a list of all the available properties tha are exposed to the user. Relationships will hold the relationships the processor will use to direct the flow files. For more details on relationships, properties, and components of an Apache Nifi flow please read the offical developer guide. There is plenty of room to expand on custom validators, but there is a large selection of validators in nifi-processor-utils package.
The init function is called at the start of Apache Nifi. Remember that this is a highly multi-threaded environment and be careful what you do in this space. This is why both the list of properties and the set of relationships are set with unmodifiable collections. I put the getters for the properties and relationships here as well.
The onTrigger method is called when ever a flow file is passed to the processor. For more details on the context and session variables please again refer to the official developer guide.
@OverridepublicvoidonTrigger(ProcessContextcontext,ProcessSessionsession)throwsProcessException{finalProcessorLoglog=this.getLogger();finalAtomicReference<String>value=newAtomicReference<>();FlowFileflowfile=session.get();session.read(flowfile,newInputStreamCallback(){@Overridepublicvoidprocess(InputStreamin)throwsIOException{try{Stringjson=IOUtils.toString(in);Stringresult=JsonPath.read(json,"$.hello");value.set(result);}catch(Exceptionex){ex.printStackTrace();log.error("Failed to read json string.");}}});// Write the results to an attributeStringresults=value.get();if(results!=null&&!results.isEmpty()){flowfile=session.putAttribute(flowfile,"match",results);}// To write the results back out ot flow fileflowfile=session.write(flowfile,newOutputStreamCallback(){@Overridepublicvoidprocess(OutputStreamout)throwsIOException{out.write(value.get().getBytes());}});session.transfer(flowfile,SUCCESS);}
In general you pull the flow file out of session. Read and write to the flow files and add attributes where needed. To work on flow files nifi provides 3 callback interfaces.
InputStreamCallback: For reading the contents of the flow file through a input stream.
Using Apache Commons to read the input stream out to a string. Use JsonPath to attempt to read the json and set a value to the pass on. It would normally be best practice in the case of a exception to pass the original flow file to a Error relation point in the case of an exception.
session.read(flowfile,newInputStreamCallback(){@Overridepublicvoidprocess(InputStreamin)throwsIOException{try{Stringjson=IOUtils.toString(in);Stringresult=JsonPath.read(json,"$.hello");value.set(result);}catch(Exceptionex){ex.printStackTrace();log.error("Failed to read json string.");}}});
OutputStreamCallback: For writing to a flowfile, this will over write not concatenate.
We simply write out the value we recieved in the InputStreamCallback
StreamCallback: This is for both reading and writing to the same flow file. With both the outputstreamcallback and streamcall back remember to assign it back to a flow file. This processor is not in use in the code and could have been. The choice was deliberate to show a way of moving data out of callbacks and back in.
// Write the results to an attributeStringresults=value.get();if(results!=null&&!results.isEmpty()){flowfile=session.putAttribute(flowfile,"match",results);}
Finally every flow file that is generated needs to be deleted or transfered.