搜索引擎——Elasticsearch
Elasticsearch 是一个分布式、RESTful 风格的搜索和数据分析引擎,**它可以快速地储存、搜索和分析海量数据。**作为 Elastic Stack 的核心,Elasticsearch 会集中存储您的数据,让您飞快完成搜索,微调相关性,进行强大的分析,并轻松缩放规模。Elasticsearch:官方分布式搜索和分析引擎 | Elastic**Mapping 是用来定义一个文档(doc
文章目录
该文章已同步收录到我的博客网站,欢迎浏览我的博客网站,xhang’s blog
1.ElasticSearch简介
Elasticsearch 是一个分布式、RESTful 风格的搜索和数据分析引擎,**它可以快速地储存、搜索和分析海量数据。**作为 Elastic Stack 的核心,Elasticsearch 会集中存储您的数据,让您飞快完成搜索,微调相关性,进行强大的分析,并轻松缩放规模。
elasticsearch官网:Elasticsearch:官方分布式搜索和分析引擎 | Elastic
官方文档地址:Elasticsearch Guide | Elastic
2.基本概念
-
index(索引)
动词,相当于mysql的insert
名词,相当于mysql的Database
-
Type(类型)
在index(索引)中,可以定义一个或多个类型。类似于MySQL当中的Table,每一种类型的数据放在一起。
- Document(文档)
保存在某个索引(Index) 下,某种类型(Type) 的一个数据(Document) ,文档是JSON格式的,Document就像 是MySQL中的某个Table里面的内容;
3.Elasticsearch概念-倒排索引
- 分词
将整句分拆为单词
-
报错的记录
-
红海行动
-
探索红海行动
-
红海特别行动
-
红海记录篇
-
特工红海特别探索
-
-
检索
- 红海特工行动
- 红海行动
-
相关性得分
词 | 记录 |
---|---|
红海 | 1,2,3,4,5 |
行动 | 1,2,3 |
探索 | 2,5 |
特别 | 3,5 |
记录篇 | 4 |
特工 | 5 |
4.Elasticsearch和Kibana的安装
- 拉取镜像
docker pull elasticsearch:7.4.2 //存储和检索数据
docker pull kibana:7.4.2 //可视化检索数据
- 创建挂载目录
mkdir -p /mydata/elasticsearch/config
mkdir -p /mydata/elasticsearch/data
// 任何远程机器都能访问es
echo "http.host: 0.0.0.0" >> /mydata/elasticsearch/config/elasticsearch.yml
chmod -R 777 /mydata/elasticsearch/ 改变文件权限
- 创建elasticsearch容器实例
docker run -d -p 9200:9200 -p 9300:9300 \
--restart=always \
-e "discovery.type=single-node" \
-e ES_JAVA_OPTS="-Xms64m -Xmx512m" \
-v /mydata/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-v /mydata/elasticsearch/data:/usr/share/elasticsearch/data \
-v /mydata/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
--name elasticsearch elasticsearch:7.4.2
命令解释:
-e “discovery.type=single-node”:使es单节点运行
-e ES_JAVA_OPTS=“-Xms64m -Xmx512m”:设置es占用的内存
- 开放对应的端口
firewall-cmd --zone=public --add-port=9200/tcp --permanent
firewall-cmd --zone=public --add-port=9300/tcp --permanent
systemctl restart firewalld.service
firewall-cmd --zone=public --query-port=9200/tcp
- 访问9200端口测试
- 创建kibana容器实例
docker run -d -p 5601:5601 \
--restart=always \
-e ELASTICSEARCH_HOSTS=http://192.168.26.160:9200 \
--name kibana \
kibana:7.4.2
- 开放对应端口
firewall-cmd --zone=public --add-port=5601/tcp --permanent
systemctl restart firewalld.service
- 访问对应的5601端口
5.Elasticsearch入门操作
5.1_cat
GET /_cat/nodes
:查看所有节点
GET /_cat/health
:查看 es 健康状况
GET /_cat/master
:查看主节点
GET /_cat/indices
:查看所有索引
相当于Mysql数据库的show databases
5.2PUT&POST新增数据
PUT 和 POST 都可以, POST 新增。如果不指定 id,会自动生成 id。指定 id 就会修改这个数据,并新增版本号 。
PUT 可以新增可以修改。PUT 必须指定 id;由于 PUT 需要指定 id,我们一般都用来做修改操作,不指定 id 会报错。
保存一个数据,保存在哪个索引的哪个类型下,指定用哪个唯一标识 PUT customer/external/1;
在 customer 索引下的 external 类型下保存 1 号数据为
POST /customer/external/1
保存的数据
{
"name": "zhangsan"
}
如果当前索引不存在就会自动创建:
5.3PUT&POST修改数据
- POST /customer/external/1/_update
{
"doc": {
"name": "lisi"
}
}
此种方式会查询数据,如果相同不允许修改
- POST /customer/external/1/
{
"name": "zhangsan"
}
- PUT /customer/external/1/
{
"name": "zhangsan"
}
5.4GET查询数据
GET /索引/类型/数据id
结果:
{
"_index": "customer",//在哪个索引
"_type": "external",//在哪个类型
"_id": "1", //记录 id
"_version": 6,//版本号
"_seq_no": 5,//并发控制字段,每次更新就会+1,用来做乐观锁
"_primary_term": 1,//同上,主分片重新分配,如重启,就会变化
"found": true, //查询是否成功
"_source": { //真正的内容
"name": "zhangsan"
}
}
5.5DELETE删除数据
DELETE /customer/external/1
5.7bulk批量操作
POST /customer/external/_bulk
{"index":{"_id":"1"}}
{"name":"John Doe"}
{"index":{"_id":"2"}}
{"name":"Jane Doe"}
语法格式:
{ action: { metadata }}\n
{ request body }\n
{ action: { metadata }}\n
{ request body }\n
切换到kibana的Dev Tools
指定命令
5.6乐观锁字段
对应GET的查询结果,其中有一个字段_sep_no
,其为了是进行并发控制,当修改数据后,对应的_sep_no
版本就会更改。
实现方式:
更新携带 ?if_seq_no=*&if_primary_term=1
再次指定_sep_no
为相同值将不会生效
6.Elasticsearch进阶操作
6.1批量导入测试数据
POST /bank/account/_bulk
测试数据地址:https://gitee.com/xu-huaiang/codes/nbqcg1dsfh6vutk4o8mxy65
6.2Search API
ES 支持两种基本方式检索 :
-
一个是通过使用 REST request URI 发送搜索参数(uri+检索参数)
-
另一个是通过使用 REST request body 来发送它们(uri+请求体)
一切检索从
_search
开始
- GET bank/_search
检索 bank 下所有信息,包括 type 和 docs
- GET bank/_search?q=*&sort=account_number:asc
请求参数方式检索(按照account_number
升序排列)
- uri+请求体进行检索
GET bank/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"account_number": "asc"
},
{
"balance": "desc"
}
]
}
6.3Query DSL
Elasticsearch 提供了一个可以执行查询的 Json 风格的 DSL(domain-specific language 领域特定语言)。这个被称为 Query DSL。该查询语言非常全面,并且刚开始的时候感觉有点复杂。
- 一个查询语句的典型结构
{
QUERY_NAME: {
ARGUMENT: VALUE,
ARGUMENT: VALUE,...
}
}
- 如果是针对某个字段,那么它的结构如下
{
QUERY_NAME(查询条件): {
FIELD_NAME(字段名): {
ARGUMENT(参数名): VALUE(参数值),
ARGUMENT: VALUE,...
}
}
}
例子:
GET /bank/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"balance": {
"order": "desc"
}
}
],
"from": 0,
"size": 3
}
查询结果:
6.3.1_source返回部分字段
可以采用_source
字段规定查询的参数名
GET /bank/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"balance": {
"order": "desc"
}
}
],
"_source": ["firstname","balance"],
"from": 0,
"size": 3
}
6.3.2match精确/模糊查询
- 可以精确匹配字段值
GET /bank/_search
{
"query": {
"match": {
"account_number": "1"
}
}
}
即精确匹配account_number
为1的数据
- 也可以模糊匹配 ,匹配不区分大小写,排序按照
倒排索引
排序
- 也可以在参数上使用
keyword
进行精确匹配,并且区分大小写
GET /bank/_search
{
"query": {
"match": {
"address.keyword": "198 Mill Lane"
}
}
}
6.3.3match_phrase分组匹配
将需要匹配的值当成一个整体单词(不分词)进行检索
GET /bank/_search
{
"query": {
"match_phrase": {
"address": "mill lane"
}
}
}
6.3.4multi_match多字段匹配
对指定的多个字段进行关键字配置
GET /bank/_search
{
"query": {
"multi_match": {
"query": "mill movico",
"fields": ["address","city"]
}
}
}
6.3.5bool复合查询
bool 用来做复合查询:
复合语句可以合并任何其它查询语句,包括复合语句,了解这一点是很重要的。这就意味着,复合语句之间可以互相嵌套,可以表达非常复杂的逻辑。
GET /bank/_search
{
"query": {
"bool": {
"must": [ //必须满足的条件
{
"match": { //参数匹配
"gender": "F"
}
},
{
"match": { //参数匹配
"address": "mill"
}
}
],
"must_not": [ //必须不满足的
{
"match": {//参数匹配
"age": "38"
}
}
],
"should": [ //应该满足的
{
"match": { //参数匹配
"lastname": "Long"
}
}
]
}
}
}
6.3.6filter结果过滤
filter
与之前的must
不同的是,filter
不会计算相关性得分
并不是所有的查询都需要产生分数,特别是那些仅用于 “filtering”(过滤)的文档。为了不计算分数 Elasticsearch 会自动检查场景并且优化查询的执行。
测试:查询年龄在10到20之间的数据
GET /bank/_search
{
"query": {
"bool": {
"must": [
{
"range": {
"age": {
"gte": 10,
"lte": 20
}
}
}
]
}
}
}
不使用filter,会有相关项得分
使用filter,相关性得分为0
6.3.7term查询
和 match 一样。匹配某个属性的值。全文检索字段用 match,其他非 text(文本)字段匹配用 term
GET /bank/_search
{
"query": {
"term": {
"age": {
"value": "22"
}
}
}
}
6.4执行聚合(aggregations)
6.4.1执行聚合的概念
**聚合提供了从数据中分组和提取数据的能力。**最简单的聚合方法大致等于 SQL GROUP BY 和 SQL 聚合函数。
在 Elasticsearch 中,您有执行搜索返回 hits(命中结果),并且同时返回聚合结果,把一个响应中的所有 hits(命中结果)分隔开的能力。这是非常强大且有效的, 您可以执行查询和多个聚合,并且在一次使用中得到各自的(任何一个的)返回结果,使用 一次简洁和简化的 API 来避免网络往返。
执行聚合文档地址:[Aggregations | Elasticsearch Guide 7.5] | Elastic
6.4.2测试
搜索 address 中包含 mill 的所有人的年龄分布以及平均年龄,但不显示这些人的详情。
GET /bank/_search
{
"query": {
"match": {
"address": "mill"
}
},
"aggs": {
"ageData": {
"terms": { //terms执行聚合age参数,并查询前5条数据
"field": "age",
"size": 5
}
},
"ageAvg": { //平均年龄
"avg": {
"field": "age"
}
}
},
"size": 0 //不显示搜索结果
}
6.4.3子聚合
按年龄进行分组,并且计算出组内的平均薪资
在聚合当中再使用聚合进行查询
GET /bank/_search
{
"query": {
"match_all": {}
},
"aggs": {
"aggGroup": {
"terms": {
"field": "age",
"size": 1000
},
"aggs": {
"ageAvg": {
"avg": {
"field": "balance"
}
}
}
}
},
"size": 0
}
6.4.4综合测试
按照年龄段进行分组,并求出该年龄段的平均信息,和该年龄段的男女数和各男女的平均薪资
GET /bank/_search
{
"query": {
"match_all": {}
},
"aggs": {
"aggGroup": {
"terms": {
"field": "age",
"size": 1000
},
"aggs": {
"ageBalanceaAvg": {
"avg": {
"field": "balance"
}
},
"genderGroup": {
"terms": {
"field": "gender.keyword",
"size": 10
},
"aggs": {
"balanceAvg": {
"avg": {
"field": "balance"
}
}
}
}
}
}
},
"size": 0
}
6.5Mapping
6.5.1Elasticsearch7.0之后移除Type的说明
- 关系型数据库中两个数据表示是独立的,即使他们里面有相同名称的列也不影响使用,但ES 中不是这样的。elasticsearch是基于Lucene开发的搜索引擎,而ES中不同type下名称相同的filed最终在Lucene中的处理方式是一样的。
- 两个不同type下的两个user_name,在ES同一个索引下其实被认为是同一个filed,你必须在两个不同的type中定义相同的filed映射。否则,不同type中的相同字段名称就会在 处理中出现冲突的情况,导致Lucene处理效率下降。
- 去掉type就是为了提高ES处理数据的效率。 • Elasticsearch 7.x • URL中的type参数为可选。比如,索引一个文档不再要求提供文档类型。 • Elasticsearch 8.x • 不再支持URL中的type参数。
- 解决:将索引从多类型迁移到单类型,每种类型文档一个独立索引。
6.5.2Mapping(映射) 概述
**Mapping 是用来定义一个文档(document),以及它所包含的属性(field)是如何存储和索引的。**当添加数据时,他会自动处理属性类型。
比如,使用 mapping 来定义:
- 哪些字符串属性应该被看做全文本属性(full text fields)。
- 哪些属性包含数字,日期或者地理位置。
- 文档中的所有属性是否都能被索引(_all 配置)。
- 日期的格式。
- 自定义映射规则来执行动态添加属性。
查看mapping信息:
GET /bank/_mapping
6.5.3自定义Mapping规则
如果存储数据之前,存储的数据类型并不是我们想要的,也可以修改索引下参数的Mapping规则
PUT /my_index
{
"mappings": {
"properties": {
"age":{"type": "integer"},
"email":{"type": "keyword"},
"name":{"type": "text"}
}
}
}
6.5.4添加Mapping规则
PUT /my_index/_mapping
{
"properties": {
"address": {
"type": "keyword"
}
}
}
6.5.5修改Mapping规则
对于已经存在的映射字段,我们不能更新。更新必须创建新的索引进行数据迁移
6.5.6数据迁移
- 对于之前的bank数据,是存在type的,现在可以使用数据迁移更改
-
新建映射规则
-
首先查询原先的数据Mapping规则:
GET /bank/_mapping
- 根据原先的数据的Mapping进行修改:
PUT /newbank
{
"mappings": {
"properties": {
"account_number": {
"type": "long"
},
"address": {
"type": "text"
},
"age": {
"type": "integer"
},
"balance": {
"type": "long"
},
"city": {
"type": "keyword"
},
"email": {
"type": "keyword"
},
"employer": {
"type": "keyword"
},
"firstname": {
"type": "text"
},
"gender": {
"type": "keyword"
},
"lastname": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"state": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
- 数据迁移
POST _reindex
{
"source": {
"index": "bank",
"type": "account"
},
"dest": {
"index": "newbank"
}
}
GET /newbank/_search
6.6分词
6.6.1分词器概述
一个 tokenizer(分词器)接收一个字符流,将之分割为独立的 tokens(词元,通常是独立的单词),然后输出 tokens 流。
例如,whitespace tokenizer 遇到空白字符时分割文本。它会将文本 “Quick brown fox!” 分割 为 [Quick, brown, fox!]。 该 tokenizer(分词器)还负责记录各个 term(词条)的顺序或 position 位置(用于 phrase 短 语和 word proximity 词近邻查询),以及 term(词条)所代表的原始 word(单词)的 start (起始)和 end(结束)的 character offsets(字符偏移量)(用于高亮显示搜索的内容)。
Elasticsearch 提供了很多内置的分词器,可以用来构建 custom analyzers(自定义分词器)。
6.6.2分词器案例
如下是使用标准分词器:
POST _analyze
{
"analyzer": "standard",
"text": "Hello,My name is Xuhuaiang"
}
6.6.3安装ik分词器
分词器是对于英文的,对于中文就不太友好,所以就可以安装ik分词器
进入到elasticsearch
的挂载目录plugins
下,创建文件夹,并将ik分词器文件放在此目录下并解压到指定目录:
unzip -d ik/ elasticsearch-analysis-ik-7.4.2.zip
重启docker容器,并测试ik分词器:
POST _analyze
{
"analyzer": "ik_smart",
"text": "我是中国人"
}
POST _analyze
{
"analyzer": "ik_max_word",
"text": "我是中国人"
}
6.6.4安装Nginx
- 拉取nginx镜像
docker pull nginx:1.18.0
- 首先创建一个nginx容器,只是为了复制出配置
# 1.运行容器
docker run -p 80:80 --name nginx -d nginx:1.18.0
# 2.将容器内的配置文件拷贝到当前/mydata中:
docker container cp nginx:/etc/nginx .
# 3.将文件nginx修改为conf
mv nginx conf
# 4.创建文件夹nginx
mkdir nginx
# 5.将conf目录拷贝到nginx目录
cp -r conf nginx/
# 6.删除conf目录
rm -rf conf
# 3.停止并删除容器
docker stop nginx && docker rm nginx
- 启动容器实例
docker run -d -p 80:80 \
--restart=always \
-v /mydata/nginx/html:/usr/share/nginx/html \
-v /mydata/nginx/logs:/var/log/nginx \
-v /mydata/nginx/conf:/etc/nginx \
--name nginx nginx:1.18.0
6.6.4自定义扩展词库
有时候ik分词器并不能按照我们想的那样进行分词,这个时候就需要进行自定义分词。
- 在刚刚创建的nginx的html目录下创建es目录,再创建文件,将分词内容放到文件中,重启nginx容器实例
- 修改ik分词器的配置
进入到/mydata/elasticsearch/plugins/ik/config
目录下,修改IKAnalyzer.cfg.xml
目录
填写远程扩展字典地址:
再重启es容器实例
- 测试
POST _analyze
{
"analyzer": "ik_smart",
"text": "徐**爱赵**"
}
7.Elasticsearch-Rest-Client配置
Elasticsearch Clients官网地址:Elasticsearch Clients | Elastic
这里使用Java Rest Clients的elasticsearch-rest-high-level-client
(RHLC)来操作ES
文档说明:[Index API | Java REST Client 7.17] | Elastic
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.4.2</version>
</dependency>
添加RHLC配置类
import org.apache.http.HttpHost;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ElasticsearchConfig {
// 通用设置项
public static final RequestOptions COMMON_OPTIONS;
static {
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
COMMON_OPTIONS = builder.build();
}
@Bean
public RestHighLevelClient esRestClient() {
RestHighLevelClient restHighLevelClient = new RestHighLevelClient(
RestClient.builder(
new HttpHost("192.168.26.160", 9200, "http")
)
);
return restHighLevelClient;
}
}
8.SpringBoot+RHLC测试
/**
* 保存数据到es
*/
@Test
public void saveData() throws IOException {
// 指定索引
IndexRequest indexRequest = new IndexRequest("users");
// 数据id
indexRequest.id("1");
User user = new User();
user.setUsername("zhangsan");
user.setGender("男");
user.setAge(20);
ObjectMapper mapper = new ObjectMapper();
// 将User对象转换为JSON数据
String userJsonData = mapper.writeValueAsString(user);
// 要保存的内容
indexRequest.source(userJsonData, XContentType.JSON);
// 执行操作
IndexResponse response = restHighLevelClient.index(indexRequest, ElasticsearchConfig.COMMON_OPTIONS);
System.out.println(response);
}
再kinbana上进行测试:
GET /users/_search
9.RHLC常用操作
9.1检索数据
步骤:
构建检索条件(SearchSourceBuilder
) -> 创建检索请求(指定索引,传入检索请求
) -> 执行检索
@Test
public void searchData() throws IOException {
// 1.构建检索条件
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// 2.检索address字段中有mill的数据
searchSourceBuilder.query(QueryBuilders.matchQuery("address","mill"));
// 3.按照年龄的值分布进行聚合
TermsAggregationBuilder ageGroup = AggregationBuilders.terms("ageGroup").field("age").size(10);
// 4.聚合计算平均薪资
AvgAggregationBuilder balanceAvg = AggregationBuilders.avg("balanceAvg").field("balance");
searchSourceBuilder
.aggregation(ageGroup)
.aggregation(balanceAvg);
// 5.指定索引和检索条件
// 5.1创建检索请求
SearchRequest searchRequest = new SearchRequest();
searchRequest
.indices("bank")
.source(searchSourceBuilder.size(0));
// 6.执行检索
SearchResponse search = rhlc.search(searchRequest, RequestOptions.DEFAULT);
// 7.响应结果
log.info("检索结果:" + search);
}
9.2处理响应数据
@Data
static class Account{
private int account_number;
private int balance;
private String firstname;
private String lastname;
private int age;
private String gender;
private String address;
private String employer;
private String email;
private String city;
private String state;
}
@Test
public void searchData() throws IOException {
// 1.构建检索条件
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// 2.检索address字段中有mill的数据
searchSourceBuilder.query(QueryBuilders.matchQuery("address", "mill"));
// 3.按照年龄的值分布进行聚合
TermsAggregationBuilder ageGroup = AggregationBuilders.terms("ageGroup").field("age").size(10);
// 4.聚合计算平均薪资
AvgAggregationBuilder balanceAvg = AggregationBuilders.avg("balanceAvg").field("balance");
searchSourceBuilder
.aggregation(ageGroup)
.aggregation(balanceAvg);
// 5.指定索引和检索条件
// 5.1创建检索请求
SearchRequest searchRequest = new SearchRequest();
searchRequest
.indices("bank")
.source(searchSourceBuilder);
// 6.执行检索
SearchResponse responseSearch = rhlc.search(searchRequest, RequestOptions.DEFAULT);
// 7.处理响应数据
SearchHits hits = responseSearch.getHits();
SearchHit[] hitsArray = hits.getHits();
Arrays.stream(hitsArray).map(hit -> {
String sourceAsString = hit.getSourceAsString();
ObjectMapper mapper = new ObjectMapper();
Account account = null;
try {
account = mapper.readValue(sourceAsString, Account.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
} finally {
return account;
}
}).forEach(account -> log.info("账户信息:" + account));
}
9.3处理聚合结果
@Data
static class Account {
private int account_number;
private int balance;
private String firstname;
private String lastname;
private int age;
private String gender;
private String address;
private String employer;
private String email;
private String city;
private String state;
}
@Test
public void searchData() throws IOException {
// 1.构建检索条件
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// 2.检索address字段中有mill的数据
searchSourceBuilder.query(QueryBuilders.matchQuery("address", "mill"));
// 3.按照年龄的值分布进行聚合
TermsAggregationBuilder ageGroup = AggregationBuilders.terms("ageGroup").field("age").size(10);
// 4.聚合计算平均薪资
AvgAggregationBuilder balanceAvg = AggregationBuilders.avg("balanceAvg").field("balance");
searchSourceBuilder
.aggregation(ageGroup)
.aggregation(balanceAvg);
// 5.指定索引和检索条件
// 5.1创建检索请求
SearchRequest searchRequest = new SearchRequest();
searchRequest
.indices("bank")
.source(searchSourceBuilder);
// 6.执行检索
SearchResponse responseSearch = rhlc.search(searchRequest, RequestOptions.DEFAULT);
// 7.处理聚合结果
Aggregations aggregations = responseSearch.getAggregations();
Terms ageGroups = aggregations.get("ageGroup");
for (Terms.Bucket bucket : ageGroups.getBuckets()) {
String key = bucket.getKeyAsString();
log.info("年龄:" + key + "===>" + "一共" + bucket.getDocCount() + "人。");
}
Avg balanceAvgs = aggregations.get("balanceAvg");
log.info("平均薪资:" + balanceAvgs.getValue());
}
10.案例:定时任务清除ELK日志监控系统中每天的日志索引
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.indices.DeleteIndexRequest;
import co.elastic.clients.elasticsearch.indices.DeleteIndexResponse;
import co.elastic.clients.elasticsearch.indices.GetIndexRequest;
import co.elastic.clients.elasticsearch.indices.GetIndexResponse;
import com.vector.common.constants.SystemInfoConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@Component
@Slf4j
public class SystemLog {
private ElasticsearchClient elasticsearchClient;
private static final String SYSTEM_LOG_INDEX_PREFIX_MATCH = SystemInfoConstants.SYSTEM_LOG_INDEX_PREFIX + "*";
public SystemLog(ElasticsearchClient elasticsearchClient) {
this.elasticsearchClient = elasticsearchClient;
}
/**
* 每月1号删除一个月前的日志
*
* @throws IOException
*/
@Scheduled(cron = "0 0 0 1 * ?")
public void systemLog() throws IOException {
Set<String> systemLogIndexLists = searchSystemLogIndex();
if(systemLogIndexLists.isEmpty()){
log.info("没有找到索引{}", systemLogIndexLists);
return;
}
LocalDateTime oneMonthBefore = LocalDateTime.now().minusMonths(1);
StringBuilder oneMonthFormat = new StringBuilder(oneMonthBefore.format(DateTimeFormatter.ISO_DATE));
oneMonthFormat.insert(0, SystemInfoConstants.SYSTEM_LOG_INDEX_PREFIX);
//删除一个月前的索引
List<String> ids = systemLogIndexLists.stream()
.filter(currentIndexDate -> {
// 2020-03-25 > 2020-02-25 true
return oneMonthFormat.toString().compareTo(currentIndexDate) > 0;
})
.collect(Collectors.toList());
if (ids.isEmpty()) {
log.info("没有需要删除的索引{}", ids);
return;
}
DeleteIndexResponse result = elasticsearchClient.indices().delete(DeleteIndexRequest.of(id -> id
.index(ids)));
log.info("删除索引{},结果:{}", ids, result.toString());
}
/**
* 查询所有SystemLog索引
*
* @return
* @throws IOException
*/
private Set<String> searchSystemLogIndex() throws IOException {
// 查询yiqichang-*的索引
GetIndexResponse response = elasticsearchClient.indices()
.get(GetIndexRequest.of(idx -> idx.index(SYSTEM_LOG_INDEX_PREFIX_MATCH)));
Set<String> result = response.result().keySet();
log.info("查询到的索引:{}", result);
return result;
}
}
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)