I am working with Apache NiFi 0.5.1 on a Groovy script to replace incoming Json values with the ones contained in a mapping file. The mapping file looks like this (it is a simple .txt):
Header1;Header2;Header3
A;some text;A2
I have started with the following:
import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
def flowFile = session.get();
if (flowFile == null) {
return;
}
flowFile = session.write(flowFile,
{ inputStream, outputStream ->
def content = """
{
"field1": "A"
"field2": "A",
"field3": "A"
}"""
def slurped = new JsonSlurper().parseText(content)
def builder = new JsonBuilder(slurped)
builder.content.field1 = "A"
builder.content.field2 = "some text"
builder.content.field3 = "A2"
outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
This first step works just fine, although it is hardcoded and it is far from being ideal. My initial thought was to use the ReplaceTextWithMapping to be able to perform the substitutions, however it does not work well with complex mapping files (e.g. multi-columns). I would like to take this further, but I am not sure how to go about it. First of all, instead of passing in the entire harcoded JSON, I would like to read the incoming flowfile. How is that possible in NiFi? Before running the script as part of ExecuteScript, I have output a .Json file with content via the UpdateAttribute where filename = myResultingJSON.json. Furthermore, I know how to load a .txt file with Groovy (String mappingContent= new File('/path/to/file').getText('UTF-8'
), however how do I use the loaded file to perform the substitutions so that my resulting JSON would look like this:
{
"field1": "A"
"field2": "some text",
"field3": "A2"
}
Thank you for your help,
I.
EDIT:
First modification to the script does allow me to read from the InputStream:
import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
def flowFile = session.get();
if (flowFile == null) {
return;
}
flowFile = session.write(flowFile,
{ inputStream, outputStream ->
def content = org.apache.commons.io.IOUtils.toString(inputStream, java.nio.charset.StandardCharsets.UTF_8)
def slurped = new JsonSlurper().parseText(content)
def builder = new JsonBuilder(slurped)
builder.content.field1 = "A"
builder.content.field2 = "some text"
builder.content.field3 = "A2"
outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
I have then moved to testing the approach with the ConfigSlurper and wrote a generic class before injecting the logic into the Groovy ExecuteScript:
class TestLoadingMappings {
static void main(String[] args) {
def content = '''
{"field2":"A",
"field3": "A"
}
'''
println "This is the content of the JSON file" + content
def slurped = new JsonSlurper().parseText(content)
def builder = new JsonBuilder(slurped)
println "This is the content of my builder " + builder
def propertiesFile = new File("D:\\myFile.txt")
Properties props = new Properties()
props.load(new FileInputStream(propertiesFile))
def config = new ConfigSlurper().parse(props).flatten()
println "This is the content of my config " + config
config.each { k, v ->
if (builder[k]) {
builder[k] = v
}
}
println(builder.toPrettyString())
}
}
I am returned with a groovy.lang.MissinPropertyException and this is because the mapping is not that straightforward. All fields/properties (from field1 to field3) come into the InpuStream with the same value (e.g.) and this means that every time field2, for example, has that value you can be certain that it will be valid for the other two properties. However, I cannot have a mapping field that maps "field2" : "someText" because the actual mapping is driven by the first value in the mapping file. Here an example:
{
"field1": "A"
"field2": "A",
"field3": "A"
}
In my mapping file I have:
A;some text;A2
However field1 needs mapping to A (first value in the file) or stay the same, if you wish. Field2 needs mapping to the value in the last column (A2) and finally Field3 needs mapping to 'some text' in the middle column.
Can you help with this? Is that something I can achieve with Groovy and ExecuteScript. If needed I can split the config files into two.
Also, I have had a quick look at the other option (PutDistributedMapCache) and I am not sure I have understood how to load key-value pairs into to a distributed map cache. It looks like you would need to have a DistributedMapCacheClient and I am not sure whether this can be easy to implement.
Thank you!
EDIT 2:
Some other progress, I have now the mapping working, but not sure why it fails when reading the second line of the properties file:
"A" someText
"A2" anotherText
class TestLoadingMappings {
static void main(String[] args) {
def content = '''
{"field2":"A",
"field3":"A"
}
'''
println "This is the content of the JSON file" + content
def slurper = new JsonSlurper().parseText(content)
def builder = new JsonBuilder(slurper)
println "This is the content of my builder " + builder
assert builder.content.field2 == "A"
assert builder.content.field3 == "A"
def propertiesFile = new File('D:\\myTest.txt')
Properties props = new Properties()
props.load(new FileInputStream(propertiesFile))
println "This is the content of the properties " + props
def config = new ConfigSlurper().parse(props).flatten()
config.each { k, v ->
if (builder.content.field2) {
builder.content.field2 = config[k]
}
if (builder.content.field3) {
builder.content.field3 = config[k]
}
println(builder.toPrettyString())
println "This is my builder " + builder
}
}
}
I am returned with: This is my builder {"field2":"someText","field3":"someText"}
Any idea why?
Thank you so much
EDIT 3 (Moved from below)
I have written the following:
import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
class TestLoadingMappings {
static void main(String[] args) {
def content =
'''
{"field2":"A",
"field3":"A"
}
'''
def slurper = new JsonSlurper().parseText(content)
def builder = new JsonBuilder(slurper)
println "This is the content of my builder " + builder
def propertiesFile = new File('D:\\properties.txt')
Properties props = new Properties()
props.load(new FileInputStream(propertiesFile))
def conf = new ConfigSlurper().parse(props).flatten()
conf.each { k, v ->
if (builder.content[k]) {
builder.content[k] = v
}
println("This prints the resulting JSON :" + builder.toPrettyString())
}
}
}
However, I had to change the structure of the mapping file as following:
"field1"="substitutionText"
"field2"="substitutionText2"
I have then 'incorporated' the ConfigSlurper into the ExecuteScript script, as follows:
import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import org.apache.commons.io.IOUtils
import org.apache.nifi.processor.io.StreamCallback
import java.nio.charset.StandardCharsets
def flowFile = session.get();
if (flowFile == null) {
return;
}
flowFile = session.write(flowFile,
{ inputStream, outputStream ->
def content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
def slurped = new JsonSlurper().parseText(content)
def builder = new JsonBuilder(slurped)
outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))
def propertiesFile = new File(''D:\\properties.txt')
Properties props = new Properties()
props.load(new FileInputStream(propertiesFile))
def conf = new ConfigSlurper().parse(props).flatten();
conf.each { k, v ->
if (builder.content[k]) {
builder.content[k] = v
}
}
outputStream.write(content.toString().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
The problem seems to be the fact that I cannot really replicate the logic in the original mapping file by using something similar to the one created for in my TestLoadingMappings. As mentioned in my previous comments/edits, the mapping should work in this way:
field2 = if A then substitute to "some text"
field3 = if A then substitute to A2
...
field2 = B then substitute to "some other text"
field3 = B then substitute to B2
and son on.
In a nutshell, the mappings are driven by the incoming value in the InputStream (which varies), which conditionally maps to different values depending on the JSON attribute. Can you please recommend a better way to achieve this mapping via a Groovy/ExecuteScript? I have flexibility in modifying the mapping file, can you see a way where I can change it in order to achieve the desired mappings?
Thanks
I have some examples on how to read in a flow file containing JSON:
http://funnifi.blogspot.com/2016/02/executescript-explained-split-fields.html http://funnifi.blogspot.com/2016/05/validating-json-in-nifi-with.html http://funnifi.blogspot.com/2016/02/executescript-processor-replacing-flow.html
You've got the right structure above; basically you can use that "inputStream" variable in the closure to read the incoming flow file contents. If you want to read it in all at once (which you will likely need to do for JSON), you can use IOUtils.toString() followed by a JsonSlurper, as is done in the examples in the links above.
For your mapping file, especially if your JSON is "flat", you could have a Java Properties file, mapping the name of the field to the new value:
field2=some text
field3=A2
Check out ConfigSlurper for reading in properties files.
Once you have slurped the incoming JSON file and read in the mapping file, you can get at the individual fields of the JSON using array notation instead of direct member notation. So let's say I read the properties into a ConfigSlurper, and I want to overwrite any existing property in my input JSON (called "json" for the example) with the one from the properties file. That might look like the following:
config.parse(props).flatten().each { k,v ->
if(json[k]) {
json[k] = v
}
}
You can then continue on with your outputStream.write().
Instead of reading your mappings from a file, you could also load it into a distributed cache via the PutDistributedMapCache processor. You can read from a DistributedCacheMapServer in your ExecuteScript, I have an example here:
http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html
If your mapping is complex, you may want to use the TransformJSON processor, which will be available in the next release of NiFi (0.7.0). The associated Jira case is here:
https://issues.apache.org/jira/browse/NIFI-361
EDIT:
In response to your edits, I didn't realize you had multiple rules for various values. In this case a properties file is probably not the best way to represent the mappings. Instead you could use JSON:
{
"field2": {
"A": "some text",
"B": "some other text"
},
"field3": {
"A": "A2",
"B": "B2"
}
}
Then you can use a JSONSlurper to read in the mappings file. Here is an example of using the above mapping file:
import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import org.apache.commons.io.IOUtils
import org.apache.nifi.processor.io.StreamCallback
import java.nio.charset.StandardCharsets
def flowFile = session.get();
if (flowFile == null) {
return;
}
def mappingJson = new File('/Users/mburgess/mappings.json').text
flowFile = session.write(flowFile, { inputStream, outputStream ->
def content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
def inJson = new JsonSlurper().parseText(content)
def mappings = new JsonSlurper().parseText(mappingJson)
inJson.each {k,v ->
inJson[k] = mappings[k][v]
}
outputStream.write(inJson.toString().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)