ElasticSearch about using Logstash-input-jdbc to Import MySQL one-to-many relationship JSON structure

problem description

when importing mysql one-to-many structured data into ElasticSearch with logstash-input-jdbc, there is a phenomenon that the data queried by connected tables are not correctly stored in the array, and it is occasionally normal.

the platform version of the problem and what methods you have tried

system environment: MacOS Mojave 10.14
Mysql version: 5.6.23
ElasticSearch version: 5.5.2
logstash version: 6.4.2

related codes

logstash.conf configuration

input {
    stdin{
    }
    jdbc {
        -sharp Mysql
        jdbc_connection_string => "jdbc:mysql://localhost:3306/yii2basic"
        
        -sharp 
        jdbc_validate_connection => true
        
        -sharp 
        jdbc_user => "root"
        
        -sharp 
        jdbc_password => "******"
        
        -sharp JDBC
        jdbc_driver_library => "/Users/simon/logstash/mysql-connector-java-5.1.46.jar"
        
        -sharp JDBC 
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        
        -sharp  SQL   SQL  statement_filepath 
        -sharp statement => "select * from users"
        
        -sharp  SQL  SQL
        statement_filepath => "/Users/simon/logstash/mysql_users.sql"
        
        -sharp CronJob   Crontab 
        -sharp  *
        schedule => "* * * * *"
        
        -sharp ElasticSearch Domcument type  ES 6.x  output  document_type  7.x
        -sharptype => "users"  

        -sharp , , tracking_column , last_run_metadata_path 
        record_last_run => "true"
        last_run_metadata_path => "/Users/simon/logstash/sync_last_id"

        -sharp  last_run_metadata_path ,
        -sharp clean_run => "false"

        -sharp column ,record_last_run, track  column  true.  track  timestamp 
        use_column_value => true

        -sharp  use_column_value ,. track  column , column . mysql
        tracking_column => "id"

        -sharp  (column) 
        -sharplowercase_column_names => "false"
    }
}

filter {
    aggregate {
        task_id => "%{id}"
        code => "
            -sharp
            map["id"] = event.get("id")
            map["name"] = event.get("name")
            map["todo_list"] ||=[]
            map["todos"] ||=[]

            if (event.get("todo_id") != nil)
                if !(map["todo_list"].include? event.get("todo_id"))
                    map["todo_list"] << event.get("todo_id")        
                    map["todos"] << {
                        "todo_id" => event.get("todo_id"),
                        "title" => event.get("text"),
                    }
                end
            end

            event.cancel()
        "
        push_previous_map_as_event => true
    }
    json {
        source => "message"
        remove_field => ["message"]
        -sharpremove_field => ["message", "type", "@timestamp", "@version"]
    }
    mutate  {
        -sharpJSON ES 
        remove_field => ["tags", "@timestamp", "@version"]
    }
}

-sharp  MySQL ElasticSearch 
output {
    elasticsearch {
        -sharp ES URL
        hosts => "127.0.0.1:9200"
        
        -sharp ES 
        index => "mysql_users"

        -sharp document_type  ES 6.x  output  document_type  7.x
        document_type => "users"
        
        -sharp ID  ID
        document_id => "%{id}"
    }
    stdout {
        -sharpcodec => json_lines
    }
}

mysql_users.sql

SELECT 
`users`.`id` AS `id`,
`users`.`name` AS `name`,
`todo`.`id` AS `todo_id`,
IFNULL(`todo`.`text`, "") AS `text`,
IFNULL(`todo`.`is_done`, 0) AS `is_done`,
`todo`.`user_id` AS `user_id`
FROM `users` 
LEFT OUTER JOIN `todo` ON `todo`.`user_id` = `users`.`id` 
WHERE `todo`.id > 0
-- WHERE `users`.`id` > :sql_last_value
ORDER BY `id` 

results of query from database

clipboard.png

content after importing ElasticSearch


GET mysql_users/users/_search
{
  "size": 200,
  "sort" : [
    {"id" : "desc"}  
  ]
}

:

{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
  },
  "hits": {
    "total": 3,
    "max_score": null,
    "hits": [
      {
        "_index": "mysql_users",
        "_type": "users",
        "_id": "11",
        "_score": null,
        "_source": {
          "id": 11,
          "name": "Lily",
          "todos": [
            {
              "todo_id": 8,
              "title": ""
            }
          ],
          "todo_list": [
            8
          ]
        },
        "sort": [
          11
        ]
      },
      {
        "_index": "mysql_users",
        "_type": "users",
        "_id": "2",
        "_score": null,
        "_source": {
          "id": 2,
          "name": "Jerry",
          "todos": [
            {
              "todo_id": 5,
              "title": "dddddd"
            }
          ],
          "todo_list": [
            5
          ]
        },
        "sort": [
          2
        ]
      },
      {
        "_index": "mysql_users",
        "_type": "users",
        "_id": "1",
        "_score": null,
        "_source": {
          "id": 1,
          "name": "Simon",
          "todos": [
            {
              "todo_id": 3,
              "title": "bbbbb"
            },
            {
              "todo_id": 4,
              "title": "cccccc"
            }
          ],
          "todo_list": [
            3,
            4
          ]
        },
        "sort": [
          1
        ]
      }
    ]
  }
}

what result do you expect? What is the error message actually seen?

it is hoped that each DOM content todos and todo_list stored in ES can correspond to the number of results of the database query and be consistent with it. Now only the data with id = 1 is normal. 2 and 11 are missing a

.
Aug.27,2021

first, it is officially stated that logstash and elasticsearch versions should be the same;
second, does logstash have warn or error logs?


Brother if in an one-to-many relationship, there is still an one-to-many relationship.


according to your way, the one-to-many child nodes I write will only have one piece of data (in fact, there should be multiple)

Menu