Python Script using ExecuteStreamCommand

vcovo picture vcovo · Mar 24, 2018 · Viewed 8.6k times · Source

After doing my best to find previous questions and examples relevant to this question, and still not finding the answers that I'm looking for I figured that I would submit a question myself.

ExecuteStreamCommand seems like the perfect processor for me due to the following reasons:

  • I am able to execute any Python script and avoid Jython (in a similar fashion as ExecuteScript). Jython is not an option for me.
  • I can take in FlowFiles. This is necessary as my script is made to consume the output of a previous processor. Furthermore I like the idea of keeping the data under "NiFi management".
  • It writes an "execution status" which will be useful for routing.

In a nutshell, what I'm trying to do with ExecuteStreamCommand is:

  • Ingest the output of a previous processor (a Scrapy spider that outputs a text file with JSON lines to be exact)
  • Call a python script (e.g. python3 my_script.py)
  • Load the FlowFile that was ingested in my python script.
  • Select the content of the FlowFile.
  • Operate on the content of the FlowFile within python.
  • Output either an updated version of the original FlowFile or create a new one.
  • Continue with my NiFi flow with the updated/new FlowFile.

For clarity's sake I currently don't understand:

  • How to call the python script (from the ExecuteStreamCommand Processor)
  • How to load up the FlowFile from within Python
  • How to update or create a new FlowFile from within Python
  • How to output the updated FlowFile from Python back to NiFi.

I have come across various examples for ExecuteScript, but unfortunately these don't exactly translate to the use of the ExecuteStreamCommand.

Thank you in advance. Any advice is appreciated.

Answer

Andy picture Andy · Mar 28, 2018

From your question you say you need to invoke the Python script without using the InvokeScriptedProcessor or ExecuteScript processors because you can't use Jython. Given that requirement, you should still be able to accomplish your goal. While it requires some familiarity with the framework, all of this information is from the ExecuteStreamCommand documentation.

Your "I currently don't understand" section:

  • How to call the python script (from the ExecuteStreamCommand Processor)

    • In your ExecuteStreamCommand processor, configure the Command Arguments and Command Path properties with the following:

      • Command Arguments: any flags or args, delimited by ; (i.e. /path/to/my_script.py)
      • Command Path: /path/to/python3
  • How to load up the FlowFile from within Python

    • The flowfile content will be passed via STDIN, so in your Python script, process that data the same way you would normally process STDIN.
  • How to update or create a new FlowFile from within Python
    • NiFi handles the flowfile creation in the framework. Any data passed by your Python script to STDOUT will be populated into the content of the resulting flowfile passed to the output stream relationship of the ExecuteStreamCommand processor. Your script does not need to have any awareness of "flowfiles" in this instance. If you were instead using the ISP or ES processors, you could use the NiFi scripting API which is automatically injected into the scripts to create or update the flowfile object.
  • How to output the updated FlowFile from Python back to NiFi.
    • Again, simply write the desired flowfile content to STDOUT from your script, and (given a return status code of 0) NiFi will generate a new flowfile with that content. If you set the Output Destination Attribute property of ESC to a non-null value, NiFi will instead update the existing flowfile with a new attribute of the same name containing the output of the script.