一、logstatsh下载

对应Elasticsearch版本下载logstatsh,官网下载较慢,推荐 华为开源镜像站 下载,

地址如下:https://mirrors.huaweicloud.com/logstash/

这里下载版本为:logstash-7.3.2

二、解压使用

执行如下命令解压:

tar -zxvf logstash-7.3.2.tar.gz

将解压后的文件移动至 opt 目录下

mv logstash-7.3.2 /opt

三、配置使用

在 logstash-7.3.2 下 新建 sync 文件夹用于存放数据同步配置

cd /opt/logstash-7.3.2
mkdir sync 
cd /sync

在 sync 下创建 logstash-company-sync.conf 文件

并将MySQL驱动拷贝至该文件夹下,同时拷贝一份至logstash-7.3.2/logstash-core/lib/jars下

推荐 logstash-company-sync.conf 配置如下:

input {
	jdbc{
		# 设置 MySql/MariaDB 数据库url以及数据库名称 
		jdbc_connection_string => "jdbc:mysql://localhost:3306/recruit?useUnicode=true&characterEncoding=utf-8&useSSL=false"
		
		# 用户名和密码 
		jdbc_user => "root"
		jdbc_password => "dingwh"
		
		# 数据库驱动所在位置,可以是绝对路径或者相对路径
		# 并将驱动拷贝至logstash-7.3.2/logstash-core/lib/jars下
		jdbc_driver_library => "/opt/logstash-7.3.2/sync/mysql-connector-java-5.1.47.jar"
		
		# 驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		
		# 开启分页
		jdbc_paging_enabled => "true"
		
		# 自定义分页数量
		jdbc_page_size => "1000"
		
		# 时区
		jdbc_default_timezone =>"Asia/Shanghai"
		
		# 执行的sql文件路径
		# 或 statement => "SELECT id,company_name as companyName FROM b_company_info"
		statement_filepath => "/opt/logstash-7.3.2/sync/company.sql"
		
		# 执行任务时间间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		
		# 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
		record_last_run => true
		
		# 记录 tracking_column 字段的值
		last_run_metadata_path => "/opt/logstash-7.3.2/sync/logstash_last_time"
		
		# 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值
		use_column_value => true
		
		# 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键
		tracking_column => "uptime"
		
		# tracking_column 对应字段的类型 numeric,timestamp,默认numeric
		tracking_column_type => "timestamp"
		
		# 是否清除 last_run_metadata_path 的记录,true则每次都从头开始查询所有的数据库记录
		clean_run => false
		
		# 数据库字段名称大写转小写
		lowercase_column_names => false
	}
}

output {
    elasticsearch {
		# es地址
		hosts => ["192.168.3.130:9200"]
		
		# 同步的索引名
		index => "company_index"
		
		# 设置_docID和数据相同
		document_id => "%{id}"
	}
    stdout {
		codec => json_lines
	}
}
 

推荐company.sql如下:

# 由于uptime在数据库是秒时间戳,Long类型,这里转化为标准时间格式 FROM_UNIXTIME(uptime)
SELECT id,company_name AS companyName,ctime,FROM_UNIXTIME(uptime) AS uptime
FROM company_info
WHERE uptime >= :sql_last_value

四、运行

执行如下命令进行同步

# 进入bin目录
cd /opt/logstash-7.3.2/bin
# 执行
./logstash -f /opt/logstash-7.3.2/sync/logstash-company-sync.conf

五、 自定义模板配置中文分词

​ 目前的数据同步,mappings映射会自动创建,但是分词不会,所以需要自己配置中文分词

# 查询默认分词模板配置
GET /_template/logstash

方式一:

注:若该方式无效可使用方式二

将返回的结果保存为json文件,命名为: logstash-ik.json

修改其中部分内容,修改后内容如下:

# 去掉了上一步请求返回json的外层 logstash ,并在string_fields下新增"analyzer":"ik_max_word"

{
	"order": 0,
	"version": 60002,
	"index_patterns": [
		"*"
	],
	"settings": {
		"index": {
			"number_of_shards": "1",
			"refresh_interval": "5s"
		}
	},
	"mappings": {
		"dynamic_templates": [
			{
				"message_field": {
					"path_match": "message",
					"mapping": {
						"norms": false,
						"type": "text"
					},
					"match_mapping_type": "string"
				}
			},
			{
				"string_fields": {
					"mapping": {
						"norms": false,
						"type": "text",
						"analyzer":"ik_max_word",
						"fields": {
							"keyword": {
								"ignore_above": 256,
								"type": "keyword"
							}
						}
					},
					"match_mapping_type": "string",
					"match": "*"
				}
			}
		],
		"properties": {
			"@timestamp": {
				"type": "date"
			},
			"geoip": {
				"dynamic": true,
				"properties": {
					"ip": {
						"type": "ip"
					},
					"latitude": {
						"type": "half_float"
					},
					"location": {
						"type": "geo_point"
					},
					"longitude": {
						"type": "half_float"
					}
				}
			},
			"@version": {
				"type": "keyword"
			}
		}
	},
	"aliases": {}
}

保存至:/opt/logstash-7.3.2/sync

配置第三步的 logstash-company-sync.conf 文件,output 中进行配置,修改后的output如下

output {
    elasticsearch {
		# es地址
		hosts => ["192.168.3.130:9200"]
		
		# 同步的索引名
		index => "company_index"
		
		# 设置_docID和数据相同
		document_id => "%{id}"
		
		# 定义模板名称
		template_name => "company_ik"
		
		# 模板所在位置(模板里所存内容就是上面修改的模板)
		template => "/opt/logstash-7.3.2/sync/logstash-ik.json"
		
		# 重写模板
		template_overwrite => true
		
		# 默认为true,false关闭logstash自动管理模板功能,如果自定义模板,则设置为false
		manage_template => false
	}
    stdout {
		codec => json_lines
	}
}

再次启动即可

方式二:

配置分词器模板至elasticsearch,新增模板,匹配company开头的索引

ik_logstash:模板名字

# URL 
PUT /_template/ik_logstash

# Body json
{
	"order": 0,
	"version": 60002,
	"index_patterns": [
		"company*"
	],
	"settings": {
		"index": {
			"number_of_shards": "1",
			"refresh_interval": "5s"
		}
	},
	"mappings": {
		"dynamic_templates": [
			{
				"message_field": {
					"path_match": "message",
					"mapping": {
						"norms": false,
						"type": "text"
					},
					"match_mapping_type": "string"
				}
			},
			{
				"string_fields": {
					"mapping": {
						"norms": false,
						"type": "text",
						"analyzer":"ik_max_word",
						"fields": {
							"keyword": {
								"ignore_above": 256,
								"type": "keyword"
							}
						}
					},
					"match_mapping_type": "string",
					"match": "*"
				}
			}
		],
		"properties": {
			"@timestamp": {
				"type": "date"
			},
			"geoip": {
				"dynamic": true,
				"properties": {
					"ip": {
						"type": "ip"
					},
					"latitude": {
						"type": "half_float"
					},
					"location": {
						"type": "geo_point"
					},
					"longitude": {
						"type": "half_float"
					}
				}
			},
			"@version": {
				"type": "keyword"
			}
		}
	},
	"aliases": {}
}

按照步骤三种的配置启动logstash即可(需第一次同步时使用)

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐