1. General

1.1 HORTONWORKS DATAFLOW (HDF™)

HDF makes streaming analytics faster and easier, by enabling accelerated data collection, curation, analysis and delivery in real-time, on-premises or in the cloud through an integrated solution with Apache NiFi, Kafka and Storm.

1.2 What is Apache NiFi

A real-time integrated data logistics and simple event processing platform
Apache NiFi automates the movement of data between disparate data sources and systems, making data ingestion fast, easy and secure.
Apache NiFi Expression Language Guide

1.3 What is Elasticsearch

A Open Source, Distributed, RESTful Search Engine which same as Apache Solr.

1.4 Background

This project named HDF-experimentations, the goal is to understand how Apache NiFi works and how we can use it in production with large data. Get data from PostgreSQL and put them into Elasticsearch. We want to compare the performance of Apache Nifi with the performance of Apache Camel.

The official site of Apache Nifi has 2 releases version: 1.0.0 and 0.7.1, Hortonworks has a version named HDF 2.0 include Apache NiFi 1.0.0 and it is OpenSource., so we will use HDF 2.0 by default.

2. Environnement

  • Ubuntu 16.04 (X64)
  • java version "1.8.0_111"
  • HDF 2.1.1 (nifi-1.1.0.2.1.1.0-2), HDF released a new version on 19/12/2016.
  • Elasticsearch 2.4.1 (Kopf plugin)
  • Kibana 4.6.2 (Sense plugin)

2.1 Installation HDF 2.0 (Standalone)

wget http://public-repo-1.hortonworks.com/HDF/2.0.0.0/HDF-2.0.0.0-579.tar.gz
tar -xvf HDF-2.0.0.0-579.tar.gz
cp -r HDF-2.0.0.0-579 your/path
# add path
echo "PATH=`your/path`/HDF-2.0.0.0-579/nifi/bin:$PATH" > ~/.bashrc
# reload path
source  ~/.bashrc

Using this command nifi.sh start to execute program, we could access webUI via browser http://localhost:8080/nifi

2.2 Installation ElasticSearch

wget https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/zip/elasticsearch/2.4.1/elasticsearch-2.4.1.zip
unzip elasticsearch-2.4.1.zip
cp -r   elasticsearch-2.4.1 your/path
# add path
echo "PATH=`your/path`/elasticsearch-2.4.1/bin:$PATH" > ~/.bashrc
# reload path
source  ~/.bashrc

Run bin/elasticsearch on Unix or bin\elasticsearch.bat on Windows then run curl -X GET http://localhost:9200/
If you see these messages, it means that Elasticsearch installed with success.

2.2.1 Kopf plugin for ElasticSearch

Kopf is a simple web administration tool for elasticsearch written in JavaScript + AngularJS + jQuery + Twitter bootstrap.

Ref Github elasticsearch-kopf

Run this command,

plugin install lmenezes/elasticsearch-kopf/2.0

Point your browser at http://localhost:9200/_plugin/kopf/

2.3 Installation Kibana

wget https://download.elastic.co/kibana/kibana/kibana-4.6.2-linux-x86_64.tar.gz
tar -xvf kibana-4.6.2-linux-x86_64.tar.gz
cp -r kibana-4.6.2-linux-x86_64 your/path
# add path
echo "PATH=`your/path`/kibana-4.6.2-linux-x86_64/bin:$PATH" > ~/.bashrc
# reload path
source  ~/.bashrc

Open config/kibana.yml in an editor, set the elasticsearch.url to point at your Elasticsearch instance.

Point your browser at http://localhost:5601 , you could see the web ui.

2.3.1 Sense plugin for Kibana

Sense is a handy console for interacting with the REST API of Elasticsearch.

kibana plugin --install elastic/sense

2.4 Put the data in ElasticSearch/Apache Solr using Twitter API

  • MyElasticSolr.xml
  • MyTwitterTemplate.xml

2.4.1 Geo Map(Kibana)

We could format the Input JSON, using JoltTransformJSON. More informations about JOLT. There is a demo available at jolt-demo.appspot.com. You can paste in JSON input data and a Spec, and it will post the data to server and run the transform.

  • latitude: -90 ~ 90
  • longitude: -180 ~ 180

Warning: In elasticSearch an array representation with [lon,lat], NOT [lat,lon].

Everybody gets caught at least once: string geo-points are "latitude,longitude" , while array geo-points are [longitude,latitude]—the opposite order!

Originally, both strings and arrays in Elasticsearch used latitude followed by longitude. However, it was decided early on to switch the order for arrays in order to conform with GeoJSON.

The result is a bear trap that captures all unsuspecting users on their journey to full geolocation nirvana.

Mapping is very important for a search engine like Elasticsearch or Apache Solr.

These codes could be used in Sense.

GET twitter/_search
{
  "query": {
    "match_all": {}
  }
}

DELETE twitter
PUT twitter

GET /twitter/_search/template

GET _template

DELETE _template/twitter

PUT _template/twitter
{
  "template": "twitter",
  "order": 1,
  "settings": {
    "number_of_shards": 1
  },
  "mappings": {
    "tweet": {
      "_all": {
        "enabled": true
      },
      "dynamic_templates": [
        {
          "message_field": {
            "match": "message",
            "match_mapping_type": "string",
            "mapping": {
              "type": "string",
              "index": "analyzed",
              "omit_norms": true
            }
          }
        },
        {
          "string_fields": {
            "match": "*",
            "match_mapping_type": "string",
            "mapping": {
              "type": "string",
              "index": "analyzed",
              "omit_norms": true,
              "fields": {
                "raw": {
                  "type": "string",
                  "index": "not_analyzed",
                  "ignore_above": 256
                }
              }
            }
          }
        }
      ],
      "properties": {
        "text": {
          "type": "string"
        },
        "place": {
          "properties": {
            "country": {
              "index": "not_analyzed",
              "type": "string"
            },
            "name": {
              "index": "not_analyzed",
              "type": "string"
            }
          }
        },
        "coordinates": {
          "properties": {
            "coordinates": {
              "type": "geo_point"
            },
            "type": {
              "type": "string"
            }
          }
        }
      }
    }
  }
}

2.4.2 Retry/Failure ?

In a NiFi data-flow, it’s a common best practice to draw failure relationships so that they loop back to the feeding processor rather than terminating them. This way, you don’t lose data when it fails to process and when it does fail, you can see that it has gone down the failure relationship and can troubleshoot the issue. However, this can also result in an infinite loop...

We could use a UpdateAttribute processor with three rules configured (3 times for retry failed). The rules were added using the Advanced tab in the processor but you must use Apache NiFi Expression Language.

2.4.3 Other solution from Internet

This is definitely something you can do. So let's use your example.
You have a PutFile processor and if a given object routes to failure
three times you want to be able to detect that and do something else.
Here is how:

  • Before PutFile use an UpdateAttribute processor configured with a
    single property 'times-through-here' and a value of '0'

  • After PutFile's failure relationship route the results to another
    UpdateAttribute processor. In it set an attribute property
    'times-through-here' with a value of '${times-through-here:plus(1)}'

  • After that UpdateAttribute route the data to a RouteOnAttribute.
    The route on attribute should have a property of 'too many' with a
    value of '${times-through-here:ge( 3 )}'. For the 'too many'
    relationship this is your stuff that has failed too many times route
    it wherever you like or auto terminate. For the 'no match'
    relationship you can route it back to the PutFile

Using this same pattern and expression language capability you can
imagine a variety of powerful things you can do to control it.

If you have any questions let us know. We could turn this into a good
FAQ type entry complete with screenshots and templates.

We need to get our expression language docs on the website. In the
mean time you can view them in the running app through the help
screen.

Examples for Aapche NiFi/Zeppelin

Examples for Kibana

2.5 Build custom processor and debug

2.5.1 Custom processor with maven and eclipse

  • Command to create an Apache Nifi project
mvn archetype:generate -DarchetypeGroupId=org.apache.nifi -DarchetypeArtifactId=nifi-processor-bundle-archetype -DarchetypeVersion=0.2.1 -DnifiVersion=0.2.1
  • Sample inputs to generate a maven project
Define value for property 'groupId': : com.hortonworks
Define value for property 'artifactId': : debug-processor
Define value for property 'version':  1.0-SNAPSHOT: :
Define value for property 'artifactBaseName': : debug
Define value for property 'package':  com.hortonworks.processors.debug: :
  • This will create an archetype maven project for a custom processor with the package name, artifactId, etc specified above.

  • Import to Eclipse

File > Import > Maven > Existing Maven projects, named it debug-processor

  • You should see that archi in your eclipse

  • To run maven compile

    • In Eclipse, under Package Explorer select debug-processor and then click Run configuration on the tools bar
  • Then double click 'Maven Build'. It will prompt you for the configuration. Enter the below

    • Name: nifi-debug
    • Base dir: /home/qfdk/workspace/debug-processors
    • Under 'Goals': clean package
    • Under Maven Runtime: (scroll down to see this option).
    • Configure > Add > click ‘Directory’ and navigate to mvn install: /usr/share/apache-maven > OK > Finish > Select apache-maven > Apply > OK

  • To compile with maven

  • To run Junit to confirm processor is working correctly
    • In Eclipse, under Package Explorer select nifi-debug-processors and then click:
      • Run > Run as > JUnit test
  • you should see below (success)

  • Then you copy nifi-debug-nar-1.0-SNAPSHOT.nar in the lib of Apache Nifi, restart your nifi server, you should see your processor in the list.

2.5.2 Debug (Remote mode)

In NiF's conf directory in bootstrap.conf there is a line commented out like this:

#java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000

Uncomment that line and restart NiFi, the Java process will be listening for a debug connection on port 8000. If you want the process to wait for a connection before starting, you can also set suspend=y.

Start the Apache Nifi process and your IDE will stop at the breakpoint.

So you can use your IDE for debug :)

2.5.3 Bulletin(log) Level

Bulletin level could be changed by modifying configure processor, there is 4 level in Web UI
WARN,DEBUG,INFO,ERROR, by default is WARN.

You can change the default log level by change conf/logback.xml, then you can change log level by TRACE, so all the log could be saved in logs/nifi-apps.log.

2.6 Index the data in ElasticSearch by using PostgreSQL

2.6.1 JDBC driver

Un exception could be showed, if you use geometry type in PostgreSQL

Exception: java.lang.IllegalArgumentException: createSchema: Unknown SQL type 1111 cannot be converted to Avro type.

We can't use the geometry type, geometry type is an addition plugin in libpostgis-java library, you must preparer your postgis-jdbc.jar

# add ppa to your system
sudo add-apt-repository ppa:ubuntugis/ppa
sudo apt-get update

# install
sudo apt-get install libpostgis-java

# to find the shema postgis-jdbc
dpkg -L libpostgis-java
/usr/share/java/postgis-jdbc.jar

Then you must put postgis-jdbc.jar and posgrelsql-driver.jar in "lib" and restart nifi.

  • These files could be found in lib.

2.6.2 QueryDatabaseTable (1)

First create QueryDatabaseTable processus, click the flash and add DBCPCOnnectionPool

In DBCPCOnnectionPool, you put these information.

jdbc:postgresql_postGIS://l143test2.devits.mayenne.l121:5432/brgm

To consulte geometry type by using sql.

select
ST_X(ST_SetSRID(geom_wgs84::geometry,4326)) AS "location.lat",
ST_Y(ST_SetSRID(geom_wgs84::geometry,4326)) AS "location.lon"
from bdadresse.lieu_dit limit 10

Select
concat_ws(',',ST_Y(ST_SetSRID(geom_wgs84::geometry,4326)),ST_X(ST_SetSRID(geom_wgs84::geometry,4326))) AS coordinates
From bdadresse.lieu_dit
limit 100
  • ST_SetSRID : Set the SRID on a geometry to a particular integer value.

Reference ST_SetSRID, EPSG:4326

We could re-format the Input JSON by using JoltTransformJSON.

===> input

{
  "gid" : 393857,
  "lat" : -0.3831046220646078,
  "lon" : 43.71104834901107
}

===> joltTransform
 [
   {
     "operation": "shift",
     "spec": {
       "*": "&",
       "lon": "coordinates",
       "lat": "coordinates"
     }
      }
]

===> output
{
  "gid": 326966,
  "coordinates": [
    -1.9834433520343622,
    47.88458910168975
  ]
}

The work flow likes that, you could find the template file in source.

The mapping file in ElasticSearch

DELETE /adresse
PUT  /adresse

PUT /adresse/
{
    "mappings": {
      "default": {
        "properties": {
          "coordinates": {
            "type": "geo_point"
          },
          "gid": {
            "type": "long"
          }
        }
      }
    }
}

// the disk usage.
GET _cat/indices

2.6.3 QueryDatabaseTable (2)

This article explains how to configure the QueryDatabaseTable processor such that it will only return "new" or "added" rows from a database.

Incremental-fetch-in-nifi-with-querydatabasetable.html

In QueryDatabaseTable you can use the function witch in database.

gid,concat_ws(',',ST_Y(ST_SetSRID(geom_wgs84::geometry,4326)),ST_X(ST_SetSRID(geom_wgs84::geometry,4326))) AS coordinates,id,origin_nom,nom,importance,nature,extraction,recette

With this processor you will wait until the query finish, if there was a lot of data, maybe you will wait for 5 minutes. The data will charge in memory, i got an "OutOfMemory" exception.

Then you need cut your query in some small queries in order to balance the query time and problem of memory, you could use GenerateTableFetch.

2.6.4 GenerateTableFetch (beta)

Generates SQL select queries that fetch "pages" of rows from a table.
The partition size property, along with the table's row count, determine the size and number of pages and generated FlowFiles.

The query was generated like that, but OFFSET is bad for skipping previous rows. In our case, some queries with "offset > 1000000" take a lot of time.

select xxx
from table
order by id
limit xxx
offset xxx

If it exists an index in sql database, we want to benefit the performance, you could use the where condition, but GenerateTableFetch can't do that.

select xxx
from table
where id > last_id
order by id
limit xxx

The solution is that use the where condition with id indexed. So i tried to add a new field in this processor.

So the queries can be generated in a right way. If your use null in this column, it means that using the default processor (with offset).

To compile these code, clone nifi porject from git.
Execute mvn clean install or for parallel build execute mvn -T 2.0C clean install

git clone https://github.com/apache/nifi.git
#cd nifi
cd nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/
# to get nar
mvn clear package
# the nar file will be created in nifi-standard-nar

The nifi-standard-nar-1.2.0-SNAPSHOT.nar must put in lib and restart nifi.

I created a jira ticket in issues.apache.org NIFI-3268
and a PR NIFI-3268 Add AUTO_INCREMENT column in GenerateTableFetch to benefit index
I hope that this processor will be improved officially.

2.6.5 DistributedLoad

Distributes FlowFiles to downstream processors based on a Distribution Strategy.
If using the Round Robin strategy, the default is to assign each destination a weighting of 1 (evenly distributed).
The flow-files can distribute sur the remote machine or processors.

2.6.6 ReplaceText

Updates the content of a FlowFile by evaluating a Regular Expression (regex) against it and replacing the section of the content that matches the Regular Expression with some alternate value.

Examples:

//input json
{"gid": 30, "coordinates": "45.8431790218448612,5.42932415510516275", "id": "PAIHABIT0000000008738125", "nom": "la champagne", "nature": "Lieu-dit habité"}
{"gid": 31, "coordinates": "45.8415304654800124,5.43492449207476813", "id": "PAIHABIT0000000008738124", "nom": "la croix trieux", "nature": "Lieu-dit habité"}
{"gid": 32, "coordinates": "45.8629110283579848,4.95922451817658683", "id": "PAIHABIT0000000008738079", "nom": "les balmes", "nature": "Lieu-dit habité"}
  ...

To use ElasticSearch bulk API, we must add a header for eache data and \n for each data.

Online regex tester and debugger: PHP, PCRE, Python, Golang and JavaScript

// output

{"index":{"_index":"lieu_dit","_type":"default","_id": 30}}
{"gid":30, "coordinates": "45.8431790218448612,5.42932415510516275", "id": "PAIHABIT0000000008738125", "nom": "la champagne", "nature": "Lieu-dit habité"}

{"index":{"_index":"lieu_dit","_type":"default","_id": 31}}
{"gid":31, "coordinates": "45.8415304654800124,5.43492449207476813", "id": "PAIHABIT0000000008738124", "nom": "la croix trieux", "nature": "Lieu-dit habité"}

{"index":{"_index":"lieu_dit","_type":"default","_id": 32}}
{"gid":32, "coordinates": "45.8629110283579848,4.95922451817658683", "id": "PAIHABIT0000000008738079", "nom": "les balmes", "nature": "Lieu-dit habité"}

2.6.7 putElasticsearchHttp

Writes the contents of a FlowFile to Elasticsearch, using the specified parameters such as the index to insert into and the type of the document.
But it can't index the json array, this processor uses bulk api ... We could improve it or wait for an update. So i tried to user PostHttp

REF splitJSON processor

DocumentContext documentContext;
  
documentContext = validateAndEstablishJsonContext(processSession, original);

final JsonPath jsonPath = JSON_PATH_REF.get();
Object jsonPathResult = documentContext.read(jsonPath);
      
if (!(jsonPathResult instanceof List)) {
    processSession.transfer(original, REL_FAILURE);
}
List resultList = (List) jsonPathResult;

2.6.8 PostHttp

  • url : http://localhost:9200/lieu_dit/_bulk
  • Send as FlowFile : false
  • Use Chunked Encoding : false

2.6.9 When Apache nifi desn't work

When NiFi first starts up, the following files and directories are created:

  • content_repository

  • database_repository

  • flowfile_repository

  • provenance_repository

  • work directory

  • logs directory

  • Within the conf directory, the flow.xml.gz file and the templates directory are created.

These folds created by lancement of Apache NIFI.
The flow-file template was saved in flow.xml.gz

rm -Rf content_repository flowfile_repository database_repository provenance_repository

The flow-files were stocked in these repositories, if you have problems for starting or you can't restart Apache nifi, you could remove them.

3. PostgreSQL

PostgreSQL has its own access control, so you must add your IP adress in the config file.

sudo vi /projet/pgsql/9.5/data/pg_hba.conf

# IP adress
host    all             all             127.0.0.1/32            md5
host    all             all             192.168.0.0/16          md5
host    all             all             10.53.0.0/16            md5

4. ElasticSearch Cluster

To set up ElasticSearch Cluster is simple. Just change elasticsearch.yml.

# ======================== Elasticsearch Configuration =========================
# ---------------------------------- Cluster -----------------------------------
#
# Use a descriptive name for your cluster:
#
cluster.name: hdfexpertise
#
# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
node.name: engine-serv15
#
# Path to directory where to store the data (separate multiple locations by comma):
#
path.data: /data/es/elasticsearch-2.4.1/data
#
# Path to log files:
#
path.logs: /data/es/elasticsearch-2.4.1/logs
# ---------------------------------- Network -----------------------------------
#
# Set the bind address to a specific IP (IPv4 or IPv6):
#
network.host:  0.0.0.0
discovery.zen.ping.unicast.hosts: ["engine-serv15", "engine-serv11","engine-serv10","engine-serv04","engine-serv03"]
#
# Prevent the "split brain" by configuring the majority of nodes (total number of nodes / 2 + 1):
#
discovery.zen.minimum_master_nodes: 3

5. Final

  • Apache nifi : 1 vm
  • Elasticsearch : 4 vm
  • Total : 5 vm

Mapping for elasticsearch

PUT point_adresse/
{
    "mappings": {
      "default": {
        "properties": {
          "coordinates": {
            "type": "geo_point"
          },
          "gid": {
            "type": "long"
          }
        }
      }
    }
}

This workflow could be found in source/Template_for_point_adresse.xml

  • PostgreSQL -> Data -> JSON -> Elasticsearch

Total time : 16 minutes

Todolist

  1. Apache nifi with els VS Apache Camel with Solr

References

  • [1]“Apache NiFi Expression Language Guide,” 27-Dec-2016. [Online]. Available: https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html. [Accessed: 27-Dec-2016].

  • [2]“Apche NIFI : problem by using QueryDatabaseTable and putelasticsearch - Hortonworks.” [Online]. Available: https://community.hortonworks.com/questions/72586/how-can-i-use-an-array-with-putelasticsearch.html#answer-75159. [Accessed: 29-Dec-2016].

  • [3]“Creating a Process Group for Twitter Data in NiFi - Hortonworks.” [Online]. Available: https://community.hortonworks.com/articles/58915/creating-a-process-group-for-twitter-data-in-nifi.html. [Accessed: 29-Dec-2016].

  • [4]“Enabling the Zeppelin Elasticsearch interpreter - Hortonworks.” [Online]. Available: https://community.hortonworks.com/articles/54755/enabling-the-zeppelin-elasticsearch-interpreter.html. [Accessed: 29-Dec-2016].

  • [5]“Incremental Fetch in NiFi with QueryDatabaseTable - Hortonworks,” 27-Dec-2016. [Online]. Available: https://community.hortonworks.com/articles/51902/incremental-fetch-in-nifi-with-querydatabasetable.html. [Accessed: 27-Dec-2016].

  • [6]“Jolt Transform Demo,” 27-Dec-2016. [Online]. Available: http://jolt-demo.appspot.com/#inception. [Accessed: 27-Dec-2016].

  • [7]“lmenezes/elasticsearch-kopf,” GitHub, 27-Dec-2016. [Online]. Available: https://github.com/lmenezes/elasticsearch-kopf. [Accessed: 27-Dec-2016].

  • [8]“OFFSET is bad for skipping previous rows.” [Online]. Available: http://Use-The-Index-Luke.com/sql/partial-results/fetch-next-page. [Accessed: 27-Dec-2016].

  • [9]“Online regex tester and debugger: PHP, PCRE, Python, Golang and JavaScript.” [Online]. Available: https://regex101.com/. [Accessed: 27-Dec-2016].

  • [10]“ST_SetSRID,” 27-Dec-2016. [Online]. Available: http://postgis.net/docs/ST_SetSRID.html. [Accessed: 27-Dec-2016].

  • [11]“Using NiFi GetTwitter, UpdateAttributes and ReplaceText processors to modify Twitter JSON data. - Hortonworks.” [Online]. Available: https://community.hortonworks.com/articles/57803/using-nifi-gettwitter-updateattributes-and-replace.html. [Accessed: 29-Dec-2016].

  • [12]“WGS 84: EPSG Projection -- Spatial Reference,” 27-Dec-2016. [Online]. Available: http://spatialreference.org/ref/epsg/wgs-84/. [Accessed: 27-Dec-2016].

Comments
Write a Comment
  • Wangrenlei reply

    This blog is pretty good!And I got many knowleges about nifi,thank you very much.Oh,my friend ...

  • Wangrenlei reply

    This blog is pretty good! It is very useful for me.