Using logstash-input-jdbc Synchronize to mysql data to ES, the last piece of data could not be saved.

problem description

when I use the logstash-input-jdbc plug-in Synchronize MySQL data to ES, the SQL query meets 13
but only 12 when I save to ES. I added a data entry to the table Synchronize is the last unsaved data, and when I exit from the terminal (Ctrl + c), it will first write the last record to ES and then exit the process

.

log of execution output:

clipboard.png

generate a log when I Ctrl + c exits logstash

[2018-10-31T11:40:07,422][WARN ][logstash.agent           ] stopping pipeline {:id=>"main"}
{"name":"Kimi","id":14,"todos":[],"todo_list":[],"tags":["_aggregatefinalflush"]}

this log shows that logstash helped me write the unsaved record to ES before exiting, but there is an extra tags field on the data marked as _ aggregatefinalflush

.

platform version in which the problem occurs

ElasticSearch version-> 5.5.2
Logstash version-> 5.5.2

develop and test platform MacOS Mojave 10.14

related codes

logstash configuration:

input {
    stdin{
    }
    jdbc {
        -sharp Mysql
        jdbc_connection_string => "jdbc:mysql://localhost:3306/test"
        
        -sharp 
        jdbc_validate_connection => true
        
        -sharp 
        jdbc_user => "root"
        
        -sharp 
        jdbc_password => "123456"
        
        -sharp JDBC
        jdbc_driver_library => "/Users/simon/logstash/mysql-connector-java-5.1.36.jar"
        
        -sharp JDBC 
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        
        -sharp  SQL   SQL  statement_filepath 
        -sharp statement => "select * from users"
        
        -sharp  SQL  SQL
        statement_filepath => "/Users/simon/logstash/mysql_users.sql"
        
        -sharp CronJob   Crontab 
        -sharp  *
        schedule => "* * * * *"

        -sharp  sql  jdbc_page_size 
        -sharp jdbc_paging_enabled => true
        -sharp  jdbc_paging_enabled = true   100000
        -sharp jdbc_page_size => 1
        
        -sharp ElasticSearch Domcument type  ES 6.x  output  document_type  7.x
        -sharptype => "users"

        -sharp , , tracking_column , last_run_metadata_path 
        record_last_run => "true"
        last_run_metadata_path => "/Users/simon/logstash/sync_last_id"

        -sharp  last_run_metadata_path ,
        -sharp clean_run => "false"

        -sharp column ,record_last_run, track  column  true.  track  timestamp 
        use_column_value => true

        -sharp  use_column_value ,. track  column , column . mysql
        tracking_column => "id"

        -sharp  (column) 
        -sharplowercase_column_names => "false"
    }
}

filter {
    aggregate {
        task_id => "%{id}"
        code => "
            -sharp
            map["id"] = event.get("id")
            map["name"] = event.get("name")
            map["todo_list"] ||=[]
            map["todos"] ||=[]

            if (event.get("todo_id") != nil)
                if !(map["todo_list"].include? event.get("todo_id"))
                    map["todo_list"] << event.get("todo_id")        
                    map["todos"] << {
                        "todo_id" => event.get("todo_id"),
                        "title" => event.get("text"),
                    }
                end
            end

            event.cancel()
        "
        push_previous_map_as_event => true
    }
    
    json {
        source => "message"
        remove_field => ["message"]
        -sharpremove_field => ["message", "type", "@timestamp", "@version"]
    }
    mutate  {
        -sharpJSON ES 
        remove_field => ["@timestamp", "@version"]
    }
}

-sharp  MySQL ElasticSearch 
output {
    elasticsearch {
        -sharp ES URL
        hosts => ["127.0.0.1:9200"]
        
        -sharp ES 
        index => "mysql_users"

        -sharp document_type  ES 6.x  output  document_type  7.x
        document_type => "users"
        
        -sharp ID  ID
        document_id => "%{id}"

        codec => "json"
    }
    stdout {
        codec => json_lines
    }
}

SQL mysql_users.sql content executed:

SELECT 
`users`.`id` AS `id`,
`users`.`name` AS `name`,
`todo`.`id` AS `todo_id`,
IFNULL(`todo`.`text`, "") AS `text`,
IFNULL(`todo`.`is_done`, 0) AS `is_done`,
`todo`.`user_id` AS `user_id`
FROM `users` 
LEFT JOIN `todo` ON  `users`.`id` = `todo`.`user_id`
WHERE `users`.`id` > :sql_last_value
ORDER BY `id` ASC

Database table creation script and test data:

DROP TABLE IF EXISTS `todo`;

CREATE TABLE `todo` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `text` varchar(255) NOT NULL DEFAULT "" COMMENT "",
  `is_done` tinyint(3) DEFAULT "0" COMMENT "",
  `user_id` int(11) DEFAULT "0",
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

LOCK TABLES `todo` WRITE;
/*!40000 ALTER TABLE `todo` DISABLE KEYS */;

INSERT INTO `todo` (`id`, `text`, `is_done`, `user_id`)
VALUES
    (3,"bbbbb",0,1),
    (4,"cccccc",0,1),
    (5,"",0,2),
    (6,"Vue",0,2),
    (7,"Hello world",0,11),
    (8,"",0,11),
    (10,"",0,1),
    (11,"",0,1),
    (12,"",0,9),
    (13,"",0,9);

/*!40000 ALTER TABLE `todo` ENABLE KEYS */;
UNLOCK TABLES;

DROP TABLE IF EXISTS `users`;

CREATE TABLE `users` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT "",
  `version` int(11) DEFAULT "0",
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

LOCK TABLES `users` WRITE;
/*!40000 ALTER TABLE `users` DISABLE KEYS */;

INSERT INTO `users` (`id`, `name`, `version`)
VALUES
    (1,"Simon",0),
    (2,"Jerry",0),
    (4,"Jim",0),
    (5,"Mary",0),
    (6,"Amy",0),
    (7,"Kaiven",0),
    (8,"Bell",0),
    (9,"Sky",0),
    (10,"Sam",0),
    (11,"Lily",0),
    (12,"Lucy",0),
    (13,"David",0),
    (14,"Kimi",0);

/*!40000 ALTER TABLE `users` ENABLE KEYS */;
UNLOCK TABLES;

what result do you expect? What is the error message actually seen?

I hope the last piece of data can also try Synchronize to enter ES instead of waiting for the end of the process to write

.

normal during execution, with no error prompts and warnings

Oct.07,2021

push_previous_map_as_event => false

thanks again to @ Game for his answer.
here is a brief summary of this problem. The event map in, filter aggregate creation does not know whether my event should be over or not. That is, it doesn't know which one I went to is the last one. So set a timeout to tell it how many seconds to execute and then continue to execute the second, but that's not very rigorous, because you're not sure how long your event map will take, so the best way for me to go back and look at the official documentation is that we should give a task end condition ide/en/logstash/current/plugins. -filters-aggregate.html-sharpplugins-filters-aggregate-timeout_timestamp_field "rel=" nofollow noreferrer "> ES official website description about aggregate

Menu