Logstash input jdbc is duplicating results

Rafael Lima picture Rafael Lima · Sep 10, 2015 · Viewed 8.4k times · Source

I'm using logstash input jdbc plugin to read two (or more) databases and send the data to elasticsearch, and using kibana 4 to vizualize these data.

This is my logstash config:

input {
  jdbc {
    type => "A"
    jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar"
    jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
    jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp1;domain=CORPDOMAIN;useNTLMv2=true"
    jdbc_user => "user"
    jdbc_password => "pass"
    schedule => "5 * * * *"
    statement => "SELECT id, date, content, status from test_table"
  }

jdbc {
    type => "B"
    jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar"
    jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
    jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp2;domain=CORPDOMAIN;useNTLMv2=true"
    jdbc_user => "user"
    jdbc_password => "pass"
    schedule => "5 * * * *"
    statement => "SELECT id, date, content, status from test_table"
  }
}
filter {

}
output {

    if [type] == "A" {
        elasticsearch {
            host => "localhost"
            protocol => http
            index => "logstash-servera-%{+YYYY.MM.dd}"
        }    
    }
    if [type] == "B" {
        elasticsearch {
            host => "localhost"
            protocol => http
            index => "logstash-serverb-%{+YYYY.MM.dd}"
        }    
    }

  stdout { codec => rubydebug }
}

The problem is that every time run the logstash, it starts to save all data that is already in elastic search.

After run with the where clause = date > '2015-09-10' I'd stoped the logstash and run again (with --debug) with the 'special parameter' :sql_last_date. After the logstash startup It starts to show this in the log:

←[36mExecuting JDBC query {:statement=>"SELECT \n\tSUBSTRING(R.RECEBEDOR, 1, 2)
AS 'DDD',\nCASE WHEN R.STATUS <>  'RCON' AND R.COD_RESPOSTA in (428,429,230,425,
430,427,418,422,415,424,214,433,435,207,426) THEN 'REGRA DE NEGÓCIO'  \n       W
HEN R.STATUS = 'RCON' THEN 'SUCESSO'\n\t   ELSE 'ERRO'\n   END AS 'TIPO_MENSAGEM
',\nAP.ALIAS as 'CANAL', R.ID_RECARGA, R.VALOR, R.STATUS, R.COD_RESPOSTA, R.DESC
_RESPOSTA, R.DT_RECARGA as '@timestamp', R.ID_CLIENTE, R.ID_DEPENDENTE, R.ID_APL
ICACAO, RECEBEDOR, R.ID_OPERADORA, R.TIPO_PRODUTO \n\nFROM RECARGA R (NOLOCK)\nJ
OIN APLICACAO AP ON R.ID_APLICACAO = AP.ID_APLICACAO \nwhere R.DT_RECARGA > :sql
_last_start\nORDER BY R.DT_RECARGA ASC", :parameters=>{:sql_last_start=>2015-09-
10 18:48:00 UTC}, :level=>:debug, :file=>"/DEV/logstash-1.5.4/vendor/bundle/jrub
y/1.9/gems/logstash-input-jdbc-1.0.0/lib/logstash/plugin_mixins/jdbc.rb", :line=
>"107", :method=>"execute_statement"}←[0m

This time i ran with the 'real' statement that is:

SELECT 
    SUBSTRING(R.RECEBEDOR, 1, 2) AS 'DDD',
CASE WHEN R.STATUS <>  'RCON' AND R.COD_RESPOSTA in (428,429,230,425,430,427,418,422,415,424,214,433,435,207,426) THEN 'REGRA DE NEGÓCIO'  
       WHEN R.STATUS = 'RCON' THEN 'SUCESSO'
       ELSE 'ERRO'
   END AS 'TIPO_MENSAGEM',
AP.ALIAS as 'CANAL', R.ID_RECARGA, R.VALOR, R.STATUS, R.COD_RESPOSTA, R.DESC_RESPOSTA, R.DT_RECARGA as '@timestamp', R.ID_CLIENTE, R.ID_DEPENDENTE, R.ID_APLICACAO, RECEBEDOR, R.ID_OPERADORA

FROM RECARGA R (NOLOCK)
JOIN APLICACAO AP ON R.ID_APLICACAO = AP.ID_APLICACAO 
where R.DT_RECARGA > :sql_last_start
ORDER BY R.DT_RECARGA ASC

Anyone knows how to solve it?

Thanks!

Answer

Raghavendra picture Raghavendra · May 3, 2016

sql_last_start is now sql_last_valueplease check here the special parameter sql_last_start is now renamed to sql_last_value for better clarity as it is not only limited to datetime but may have other column type as well. so now solution may be something like this

input {
jdbc {
     type => "A"
     jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-  jdbc-1.7.1.0\lib\jtds-1.3.1.jar"
     jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
     jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp1;domain=CORPDOMAIN;useNTLMv2=true"
     jdbc_user => "user"
     jdbc_password => "pass"
     schedule => "5 * * * *"
     use_column_value => true
     tracking_column => date
     statement => "SELECT id, date, content, status from test_table WHERE date >:sql_last_value"
    #clean_run true means it will reset sql_last_value to zero or initial value if datatype is date(default is also false)
     clean_run =>false
   }
jdbc{
  #for type B....
  }
}

i have tested with sql Server DB

please run for first time with clean_run=>ture to avoid datatype error while in development we may have different datatype value stored in sql_last_value variable