该文章已同步收录到我的博客网站,欢迎浏览我的博客网站,xhang’s blog

1.ElasticSearch简介

Elasticsearch 是一个分布式、RESTful 风格的搜索和数据分析引擎,**它可以快速地储存、搜索和分析海量数据。**作为 Elastic Stack 的核心,Elasticsearch 会集中存储您的数据,让您飞快完成搜索,微调相关性,进行强大的分析,并轻松缩放规模。

elasticsearch官网:Elasticsearch:官方分布式搜索和分析引擎 | Elastic

官方文档地址:Elasticsearch Guide | Elastic

image-20230110155900267

2.基本概念

  1. index(索引)

    动词,相当于mysql的insert

    名词,相当于mysql的Database

  2. Type(类型)

​ 在index(索引)中,可以定义一个或多个类型。类似于MySQL当中的Table,每一种类型的数据放在一起。

  1. Document(文档)

​ 保存在某个索引(Index) 下,某种类型(Type) 的一个数据(Document) ,文档是JSON格式的,Document就像 是MySQL中的某个Table里面的内容;

image-20230110161820114

3.Elasticsearch概念-倒排索引

  1. 分词

​ 将整句分拆为单词

  1. 报错的记录

    • 红海行动

    • 探索红海行动

    • 红海特别行动

    • 红海记录篇

    • 特工红海特别探索

  2. 检索

    • 红海特工行动
    • 红海行动
  3. 相关性得分

记录
红海1,2,3,4,5
行动1,2,3
探索2,5
特别3,5
记录篇4
特工5

4.Elasticsearch和Kibana的安装

  1. 拉取镜像
docker pull elasticsearch:7.4.2  //存储和检索数据
docker pull kibana:7.4.2		 //可视化检索数据
  1. 创建挂载目录
mkdir -p /mydata/elasticsearch/config
mkdir -p /mydata/elasticsearch/data
// 任何远程机器都能访问es
echo "http.host: 0.0.0.0" >> /mydata/elasticsearch/config/elasticsearch.yml
chmod -R 777 /mydata/elasticsearch/ 改变文件权限

image-20230110165356531

  1. 创建elasticsearch容器实例
docker run -d -p 9200:9200 -p 9300:9300 \
--restart=always \
-e "discovery.type=single-node" \
-e ES_JAVA_OPTS="-Xms64m -Xmx512m" \
-v /mydata/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-v /mydata/elasticsearch/data:/usr/share/elasticsearch/data \
-v /mydata/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
--name elasticsearch elasticsearch:7.4.2

命令解释:

-e “discovery.type=single-node”:使es单节点运行

-e ES_JAVA_OPTS=“-Xms64m -Xmx512m”:设置es占用的内存

  1. 开放对应的端口
firewall-cmd --zone=public --add-port=9200/tcp --permanent
firewall-cmd --zone=public --add-port=9300/tcp --permanent
systemctl restart firewalld.service
firewall-cmd --zone=public --query-port=9200/tcp
  1. 访问9200端口测试

  1. 创建kibana容器实例
docker run -d -p 5601:5601 \
--restart=always \
-e ELASTICSEARCH_HOSTS=http://192.168.26.160:9200 \
--name kibana \
kibana:7.4.2
  1. 开放对应端口
firewall-cmd --zone=public --add-port=5601/tcp --permanent
systemctl restart firewalld.service
  1. 访问对应的5601端口

image-20230110172219513

5.Elasticsearch入门操作

5.1_cat

GET /_cat/nodes:查看所有节点

GET /_cat/health:查看 es 健康状况

GET /_cat/master:查看主节点

GET /_cat/indices:查看所有索引

​ 相当于Mysql数据库的show databases

image-20230110173359452

5.2PUT&POST新增数据

PUT 和 POST 都可以, POST 新增。如果不指定 id,会自动生成 id。指定 id 就会修改这个数据,并新增版本号 。

PUT 可以新增可以修改。PUT 必须指定 id;由于 PUT 需要指定 id,我们一般都用来做修改操作,不指定 id 会报错。

保存一个数据,保存在哪个索引的哪个类型下,指定用哪个唯一标识 PUT customer/external/1;

在 customer 索引下的 external 类型下保存 1 号数据为

POST /customer/external/1

保存的数据

{ 
    "name": "zhangsan"
}
image-20230110174416963

如果当前索引不存在就会自动创建:

image-20230110173859019

5.3PUT&POST修改数据

  1. POST /customer/external/1/_update
{
    "doc": {
        "name": "lisi"
    }
}

此种方式会查询数据,如果相同不允许修改

image-20230110180842679
  1. POST /customer/external/1/
{
    "name": "zhangsan"
}
image-20230110181642912
  1. PUT /customer/external/1/
{
    "name": "zhangsan"
}
image-20230110181725160

5.4GET查询数据

GET /索引/类型/数据id
image-20230110174933231

结果:

{
    "_index": "customer",//在哪个索引 
    "_type": "external",//在哪个类型
    "_id": "1", //记录 id
    "_version": 6,//版本号
    "_seq_no": 5,//并发控制字段,每次更新就会+1,用来做乐观锁
    "_primary_term": 1,//同上,主分片重新分配,如重启,就会变化
    "found": true,  //查询是否成功
    "_source": {    //真正的内容
        "name": "zhangsan"
    }
}

5.5DELETE删除数据

DELETE /customer/external/1

image-20230110182659261

5.7bulk批量操作

POST /customer/external/_bulk

{"index":{"_id":"1"}}
{"name":"John Doe"}
{"index":{"_id":"2"}}
{"name":"Jane Doe"}

语法格式:

{ action: { metadata }}\n
{ request body }\n
{ action: { metadata }}\n
{ request body }\n

切换到kibana的Dev Tools指定命令

5.6乐观锁字段

对应GET的查询结果,其中有一个字段_sep_no,其为了是进行并发控制,当修改数据后,对应的_sep_no版本就会更改。

实现方式:

更新携带 ?if_seq_no=*&if_primary_term=1

image-20230110180359046

​ 再次指定_sep_no为相同值将不会生效

image-20230110180516407

6.Elasticsearch进阶操作

6.1批量导入测试数据

POST /bank/account/_bulk

测试数据地址:https://gitee.com/xu-huaiang/codes/nbqcg1dsfh6vutk4o8mxy65

image-20230110190529972

6.2Search API

ES 支持两种基本方式检索 :

  • 一个是通过使用 REST request URI 发送搜索参数(uri+检索参数)

  • 另一个是通过使用 REST request body 来发送它们(uri+请求体)

一切检索从_search开始

  • GET bank/_search

​ 检索 bank 下所有信息,包括 type 和 docs

  • GET bank/_search?q=*&sort=account_number:asc

​ 请求参数方式检索(按照account_number升序排列)

  • uri+请求体进行检索
GET bank/_search
{
  "query": {
    "match_all": {}
  },
  "sort": [
    {
      "account_number": "asc"
    },
    {
      "balance": "desc"
    }
  ]
}

6.3Query DSL

Elasticsearch 提供了一个可以执行查询的 Json 风格的 DSL(domain-specific language 领域特定语言)。这个被称为 Query DSL。该查询语言非常全面,并且刚开始的时候感觉有点复杂。

  • 一个查询语句的典型结构
{
	QUERY_NAME: {
		ARGUMENT: VALUE,
        ARGUMENT: VALUE,...
    }
}
  • 如果是针对某个字段,那么它的结构如下
{
	QUERY_NAME(查询条件): {
		FIELD_NAME(字段名): {
			ARGUMENT(参数名): VALUE(参数值), 
            ARGUMENT: VALUE,... 
    	}
  	}
}

例子:

image-20230110210852267

GET /bank/_search
{
  "query": {
    "match_all": {}
  },
  "sort": [
    {
      "balance": {
        "order": "desc"
      }
    }
  ],
  "from": 0,
  "size": 3
}

查询结果:

image-20230110210942941

6.3.1_source返回部分字段

可以采用_source字段规定查询的参数名

GET /bank/_search
{
  "query": {
    "match_all": {}
  },
  "sort": [
    {
      "balance": {
        "order": "desc"
      }
    }
  ],
  "_source": ["firstname","balance"],
  "from": 0,
  "size": 3
}
image-20230110211334059

6.3.2match精确/模糊查询

  • 可以精确匹配字段值
GET /bank/_search
{
  "query": {
    "match": {
      "account_number": "1"
    }
  }
}

即精确匹配account_number为1的数据

image-20230110212030602

  • 也可以模糊匹配 ,匹配不区分大小写,排序按照倒排索引排序

image-20230110213230711

  • 也可以在参数上使用keyword进行精确匹配,并且区分大小写
GET /bank/_search
{
  "query": {
    "match": {
      "address.keyword": "198 Mill Lane"
    }
  }
}

image-20230112095542332

6.3.3match_phrase分组匹配

将需要匹配的值当成一个整体单词(不分词)进行检索

GET /bank/_search
{
  "query": {
    "match_phrase": {
      "address": "mill lane"
    }
  }
}

image-20230110214041190

6.3.4multi_match多字段匹配

对指定的多个字段进行关键字配置

GET /bank/_search
{
  "query": {
    "multi_match": {
      "query": "mill movico",
      "fields": ["address","city"]
    }
  }
}

image-20230110214807495

6.3.5bool复合查询

bool 用来做复合查询:

复合语句可以合并任何其它查询语句,包括复合语句,了解这一点是很重要的。这就意味着,复合语句之间可以互相嵌套,可以表达非常复杂的逻辑。

GET /bank/_search
{
  "query": {
    "bool": {
      "must": [ //必须满足的条件
        {
          "match": { //参数匹配
            "gender": "F"
          }
        },
        {
          "match": { //参数匹配
            "address": "mill"
          }
        }
      ],
      "must_not": [ //必须不满足的
        {
          "match": {//参数匹配
            "age": "38"
          }
        }
      ], 
      "should": [  //应该满足的
        {
          "match": { //参数匹配
            "lastname": "Long"
          }
        }
      ]
    }
  }
}

6.3.6filter结果过滤

filter与之前的must不同的是,filter不会计算相关性得分

并不是所有的查询都需要产生分数,特别是那些仅用于 “filtering”(过滤)的文档。为了不计算分数 Elasticsearch 会自动检查场景并且优化查询的执行。

测试:查询年龄在10到20之间的数据

GET /bank/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "range": {
            "age": {
              "gte": 10,
              "lte": 20
            }
          }
        }
      ]
    }
  }
}

不使用filter,会有相关项得分

image-20230112093507483

使用filter,相关性得分为0

image-20230112093649047

6.3.7term查询

和 match 一样。匹配某个属性的值。全文检索字段用 match,其他非 text(文本)字段匹配用 term

GET /bank/_search
{
  "query": {
    "term": {
      "age": {
        "value": "22"
      }
    }
  }
}

6.4执行聚合(aggregations)

6.4.1执行聚合的概念

**聚合提供了从数据中分组和提取数据的能力。**最简单的聚合方法大致等于 SQL GROUP BY 和 SQL 聚合函数。

在 Elasticsearch 中,您有执行搜索返回 hits(命中结果),并且同时返回聚合结果,把一个响应中的所有 hits(命中结果)分隔开的能力。这是非常强大且有效的, 您可以执行查询和多个聚合,并且在一次使用中得到各自的(任何一个的)返回结果,使用 一次简洁和简化的 API 来避免网络往返。

执行聚合文档地址:[Aggregations | Elasticsearch Guide 7.5] | Elastic

6.4.2测试

搜索 address 中包含 mill 的所有人的年龄分布以及平均年龄,但不显示这些人的详情。

GET /bank/_search
{
  "query": {
    "match": {
      "address": "mill"
    }
  },
  "aggs": {
    "ageData": {
      "terms": {  //terms执行聚合age参数,并查询前5条数据
        "field": "age",
        "size": 5
      }
    },
    "ageAvg": {   //平均年龄
      "avg": {
        "field": "age"
      }
    }
  },
  "size": 0  //不显示搜索结果
}

6.4.3子聚合

按年龄进行分组,并且计算出组内的平均薪资

在聚合当中再使用聚合进行查询

GET /bank/_search
{
  "query": {
    "match_all": {}
  },
  "aggs": {
    "aggGroup": {
      "terms": {
        "field": "age",
        "size": 1000
      },
      "aggs": {
        "ageAvg": {
          "avg": {
            "field": "balance"
          }
        }
      }
    }
  },
  "size": 0
}

image-20230112104222513

6.4.4综合测试

按照年龄段进行分组,并求出该年龄段的平均信息,和该年龄段的男女数和各男女的平均薪资

GET /bank/_search
{
  "query": {
    "match_all": {}
  },
  "aggs": {
    "aggGroup": {
      "terms": {
        "field": "age",
        "size": 1000
      },
      "aggs": {
        "ageBalanceaAvg": {
          "avg": {
            "field": "balance"
          }
        },
        "genderGroup": {
          "terms": {
            "field": "gender.keyword",
            "size": 10
          },
          "aggs": {
            "balanceAvg": {
              "avg": {
                "field": "balance"
              }
            }
          }
        }
      }
    }
  },
  "size": 0
}

image-20230112105337998

6.5Mapping

6.5.1Elasticsearch7.0之后移除Type的说明

  • 关系型数据库中两个数据表示是独立的,即使他们里面有相同名称的列也不影响使用,但ES 中不是这样的。elasticsearch是基于Lucene开发的搜索引擎,而ES中不同type下名称相同的filed最终在Lucene中的处理方式是一样的。
  • 两个不同type下的两个user_name,在ES同一个索引下其实被认为是同一个filed,你必须在两个不同的type中定义相同的filed映射。否则,不同type中的相同字段名称就会在 处理中出现冲突的情况,导致Lucene处理效率下降。
  • 去掉type就是为了提高ES处理数据的效率。 • Elasticsearch 7.x • URL中的type参数为可选。比如,索引一个文档不再要求提供文档类型。 • Elasticsearch 8.x • 不再支持URL中的type参数。
  • 解决:将索引从多类型迁移到单类型,每种类型文档一个独立索引。

6.5.2Mapping(映射) 概述

**Mapping 是用来定义一个文档(document),以及它所包含的属性(field)是如何存储和索引的。**当添加数据时,他会自动处理属性类型。

比如,使用 mapping 来定义:

  • 哪些字符串属性应该被看做全文本属性(full text fields)。
  • 哪些属性包含数字,日期或者地理位置。
  • 文档中的所有属性是否都能被索引(_all 配置)。
  • 日期的格式。
  • 自定义映射规则来执行动态添加属性。

查看mapping信息:

GET /bank/_mapping

image-20230112111153887

6.5.3自定义Mapping规则

如果存储数据之前,存储的数据类型并不是我们想要的,也可以修改索引下参数的Mapping规则

PUT /my_index
{
  "mappings": {
    "properties": {
      "age":{"type": "integer"},
      "email":{"type": "keyword"},
      "name":{"type": "text"}
    }
  }
}

6.5.4添加Mapping规则

PUT /my_index/_mapping    
{
  "properties": {
    "address": {
      "type": "keyword"
    }
  }
}

6.5.5修改Mapping规则

对于已经存在的映射字段,我们不能更新。更新必须创建新的索引进行数据迁移

6.5.6数据迁移

  1. 对于之前的bank数据,是存在type的,现在可以使用数据迁移更改

image-20230112113822034

  1. 新建映射规则

  2. 首先查询原先的数据Mapping规则:

GET /bank/_mapping
  1. 根据原先的数据的Mapping进行修改:
PUT /newbank
{
  "mappings": {
    "properties": {
      "account_number": {
        "type": "long"
      },
      "address": {
        "type": "text"
      },
      "age": {
        "type": "integer"
      },
      "balance": {
        "type": "long"
      },
      "city": {
        "type": "keyword"
      },
      "email": {
        "type": "keyword"
      },
      "employer": {
        "type": "keyword"
      },
      "firstname": {
        "type": "text"
      },
      "gender": {
        "type": "keyword"
      },
      "lastname": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "state": {
        "type": "keyword",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      }
    }
  }
}
  1. 数据迁移
POST _reindex
{
  "source": {
    "index": "bank",
    "type": "account"
  },
  "dest": {
    "index": "newbank"
  }

}

GET /newbank/_search

image-20230112115252817

6.6分词

6.6.1分词器概述

一个 tokenizer(分词器)接收一个字符流,将之分割为独立的 tokens(词元,通常是独立的单词),然后输出 tokens 流。

例如,whitespace tokenizer 遇到空白字符时分割文本。它会将文本 “Quick brown fox!” 分割 为 [Quick, brown, fox!]。 该 tokenizer(分词器)还负责记录各个 term(词条)的顺序或 position 位置(用于 phrase 短 语和 word proximity 词近邻查询),以及 term(词条)所代表的原始 word(单词)的 start (起始)和 end(结束)的 character offsets(字符偏移量)(用于高亮显示搜索的内容)。

Elasticsearch 提供了很多内置的分词器,可以用来构建 custom analyzers(自定义分词器)。

6.6.2分词器案例

如下是使用标准分词器:

POST _analyze
{
  "analyzer": "standard",
  "text": "Hello,My name is Xuhuaiang"
}

image-20230112134406494

6.6.3安装ik分词器

分词器是对于英文的,对于中文就不太友好,所以就可以安装ik分词器

Github官网:medcl/elasticsearch-analysis-ik: The IK Analysis plugin integrates Lucene IK analyzer into elasticsearch, support customized dictionary. (github.com)

image-20230112142437126

进入到elasticsearch的挂载目录plugins下,创建文件夹,并将ik分词器文件放在此目录下并解压到指定目录:

unzip -d ik/ elasticsearch-analysis-ik-7.4.2.zip

重启docker容器,并测试ik分词器:

POST _analyze
{
  "analyzer": "ik_smart",
  "text": "我是中国人"
}

image-20230112143721266

POST _analyze
{
  "analyzer": "ik_max_word",
  "text": "我是中国人"
}

image-20230112144312223

6.6.4安装Nginx

  1. 拉取nginx镜像
docker pull nginx:1.18.0
  1. 首先创建一个nginx容器,只是为了复制出配置
# 1.运行容器
docker run -p 80:80 --name nginx -d nginx:1.18.0

# 2.将容器内的配置文件拷贝到当前/mydata中:
docker container cp nginx:/etc/nginx .
# 3.将文件nginx修改为conf
mv nginx conf
# 4.创建文件夹nginx
mkdir nginx
# 5.将conf目录拷贝到nginx目录
cp -r conf nginx/
# 6.删除conf目录
rm -rf conf
# 3.停止并删除容器
docker stop nginx && docker rm nginx 
  1. 启动容器实例
docker run -d -p 80:80 \
--restart=always \
-v /mydata/nginx/html:/usr/share/nginx/html \
-v /mydata/nginx/logs:/var/log/nginx \
-v /mydata/nginx/conf:/etc/nginx \
--name nginx nginx:1.18.0

6.6.4自定义扩展词库

有时候ik分词器并不能按照我们想的那样进行分词,这个时候就需要进行自定义分词。

  1. 在刚刚创建的nginx的html目录下创建es目录,再创建文件,将分词内容放到文件中,重启nginx容器实例

image-20230112164230835

  1. 修改ik分词器的配置

​ 进入到/mydata/elasticsearch/plugins/ik/config目录下,修改IKAnalyzer.cfg.xml目录

​ 填写远程扩展字典地址:

image-20230112163727811

​ 再重启es容器实例

  1. 测试
POST _analyze
{
  "analyzer": "ik_smart",
  "text": "徐**爱赵**"
}

image-20230112164646016

7.Elasticsearch-Rest-Client配置

Elasticsearch Clients官网地址:Elasticsearch Clients | Elastic

这里使用Java Rest Clients的elasticsearch-rest-high-level-client(RHLC)来操作ES

文档说明:[Index API | Java REST Client 7.17] | Elastic

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.4.2</version>
</dependency>

添加RHLC配置类

import org.apache.http.HttpHost;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ElasticsearchConfig {

//    通用设置项
    public static final RequestOptions COMMON_OPTIONS;
    static {
        RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
        COMMON_OPTIONS = builder.build();
    }

    @Bean
    public RestHighLevelClient esRestClient() {
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("192.168.26.160", 9200, "http")
                )
        );
        return restHighLevelClient;
    }
}

8.SpringBoot+RHLC测试

    /**
     * 保存数据到es
     */
    @Test
    public void saveData() throws IOException {
//        指定索引
        IndexRequest indexRequest = new IndexRequest("users");
//        数据id
        indexRequest.id("1");
        User user = new User();
        user.setUsername("zhangsan");
        user.setGender("男");
        user.setAge(20);
        ObjectMapper mapper = new ObjectMapper();
//        将User对象转换为JSON数据
        String userJsonData = mapper.writeValueAsString(user);
//        要保存的内容
        indexRequest.source(userJsonData, XContentType.JSON);

//        执行操作
        IndexResponse response = restHighLevelClient.index(indexRequest, ElasticsearchConfig.COMMON_OPTIONS);

        System.out.println(response);
    }

image-20230112211303942

再kinbana上进行测试:

GET /users/_search

image-20230112211650049

9.RHLC常用操作

9.1检索数据

步骤:

构建检索条件(SearchSourceBuilder) -> 创建检索请求(指定索引,传入检索请求) -> 执行检索

    @Test
    public void searchData() throws IOException {
//        1.构建检索条件
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//        2.检索address字段中有mill的数据
        searchSourceBuilder.query(QueryBuilders.matchQuery("address","mill"));
//        3.按照年龄的值分布进行聚合
        TermsAggregationBuilder ageGroup = AggregationBuilders.terms("ageGroup").field("age").size(10);
//        4.聚合计算平均薪资
        AvgAggregationBuilder balanceAvg = AggregationBuilders.avg("balanceAvg").field("balance");

        searchSourceBuilder
                .aggregation(ageGroup)
                .aggregation(balanceAvg);


//        5.指定索引和检索条件
//          5.1创建检索请求
        SearchRequest searchRequest = new SearchRequest();
        searchRequest
                .indices("bank")
                .source(searchSourceBuilder.size(0));

//        6.执行检索
        SearchResponse search = rhlc.search(searchRequest, RequestOptions.DEFAULT);

//        7.响应结果
        log.info("检索结果:" + search);
    }

image-20230112215214509

9.2处理响应数据

    @Data
    static class Account{
        private int account_number;
        private int balance;
        private String firstname;
        private String lastname;
        private int age;
        private String gender;
        private String address;
        private String employer;
        private String email;
        private String city;
        private String state;
    }

    @Test
    public void searchData() throws IOException {
//        1.构建检索条件
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//        2.检索address字段中有mill的数据
        searchSourceBuilder.query(QueryBuilders.matchQuery("address", "mill"));
//        3.按照年龄的值分布进行聚合
        TermsAggregationBuilder ageGroup = AggregationBuilders.terms("ageGroup").field("age").size(10);
//        4.聚合计算平均薪资
        AvgAggregationBuilder balanceAvg = AggregationBuilders.avg("balanceAvg").field("balance");

        searchSourceBuilder
                .aggregation(ageGroup)
                .aggregation(balanceAvg);


//        5.指定索引和检索条件
//          5.1创建检索请求
        SearchRequest searchRequest = new SearchRequest();
        searchRequest
                .indices("bank")
                .source(searchSourceBuilder);

//        6.执行检索
        SearchResponse responseSearch = rhlc.search(searchRequest, RequestOptions.DEFAULT);

//        7.处理响应数据
        SearchHits hits = responseSearch.getHits();
        SearchHit[] hitsArray = hits.getHits();
        Arrays.stream(hitsArray).map(hit -> {
            String sourceAsString = hit.getSourceAsString();
            ObjectMapper mapper = new ObjectMapper();
            Account account = null;
            try {
                account = mapper.readValue(sourceAsString, Account.class);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            } finally {
                return account;
            }
        }).forEach(account -> log.info("账户信息:" + account));
    }

image-20230112222829002

9.3处理聚合结果

    @Data
    static class Account {
        private int account_number;
        private int balance;
        private String firstname;
        private String lastname;
        private int age;
        private String gender;
        private String address;
        private String employer;
        private String email;
        private String city;
        private String state;
    }
    

	@Test
    public void searchData() throws IOException {
//        1.构建检索条件
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//        2.检索address字段中有mill的数据
        searchSourceBuilder.query(QueryBuilders.matchQuery("address", "mill"));
//        3.按照年龄的值分布进行聚合
        TermsAggregationBuilder ageGroup = AggregationBuilders.terms("ageGroup").field("age").size(10);
//        4.聚合计算平均薪资
        AvgAggregationBuilder balanceAvg = AggregationBuilders.avg("balanceAvg").field("balance");

        searchSourceBuilder
                .aggregation(ageGroup)
                .aggregation(balanceAvg);


//        5.指定索引和检索条件
//          5.1创建检索请求
        SearchRequest searchRequest = new SearchRequest();
        searchRequest
                .indices("bank")
                .source(searchSourceBuilder);

//        6.执行检索
        SearchResponse responseSearch = rhlc.search(searchRequest, RequestOptions.DEFAULT);

//        7.处理聚合结果
        Aggregations aggregations = responseSearch.getAggregations();
        Terms ageGroups = aggregations.get("ageGroup");
        for (Terms.Bucket bucket : ageGroups.getBuckets()) {
            String key = bucket.getKeyAsString();
            log.info("年龄:" + key + "===>" + "一共" + bucket.getDocCount() + "人。");
        }

        Avg balanceAvgs = aggregations.get("balanceAvg");
        log.info("平均薪资:" + balanceAvgs.getValue());
    }

image-20230112230105920

10.案例:定时任务清除ELK日志监控系统中每天的日志索引

import co.elastic.clients.elasticsearch.ElasticsearchClient;

import co.elastic.clients.elasticsearch.indices.DeleteIndexRequest;
import co.elastic.clients.elasticsearch.indices.DeleteIndexResponse;
import co.elastic.clients.elasticsearch.indices.GetIndexRequest;
import co.elastic.clients.elasticsearch.indices.GetIndexResponse;
import com.vector.common.constants.SystemInfoConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;


import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

@Component
@Slf4j
public class SystemLog {

    private ElasticsearchClient elasticsearchClient;
    private static final String SYSTEM_LOG_INDEX_PREFIX_MATCH = SystemInfoConstants.SYSTEM_LOG_INDEX_PREFIX + "*";
    public SystemLog(ElasticsearchClient elasticsearchClient) {
        this.elasticsearchClient = elasticsearchClient;
    }

    /**
     * 每月1号删除一个月前的日志
     *
     * @throws IOException
     */
    @Scheduled(cron = "0 0 0 1 * ?")
    public void systemLog() throws IOException {
        Set<String> systemLogIndexLists = searchSystemLogIndex();
        if(systemLogIndexLists.isEmpty()){
            log.info("没有找到索引{}", systemLogIndexLists);
            return;
        }
        LocalDateTime oneMonthBefore = LocalDateTime.now().minusMonths(1);
        StringBuilder oneMonthFormat = new StringBuilder(oneMonthBefore.format(DateTimeFormatter.ISO_DATE));
        oneMonthFormat.insert(0, SystemInfoConstants.SYSTEM_LOG_INDEX_PREFIX);
        //删除一个月前的索引
        List<String> ids = systemLogIndexLists.stream()
                .filter(currentIndexDate -> {
                    // 2020-03-25 > 2020-02-25 true
                    return oneMonthFormat.toString().compareTo(currentIndexDate) > 0;
                })
                .collect(Collectors.toList());

        if (ids.isEmpty()) {
            log.info("没有需要删除的索引{}", ids);
            return;
        }
        DeleteIndexResponse result = elasticsearchClient.indices().delete(DeleteIndexRequest.of(id -> id
                .index(ids)));
        log.info("删除索引{},结果:{}", ids, result.toString());
    }

    /**
     * 查询所有SystemLog索引
     *
     * @return
     * @throws IOException
     */
    private Set<String> searchSystemLogIndex() throws IOException {
        // 查询yiqichang-*的索引
        GetIndexResponse response = elasticsearchClient.indices()
                .get(GetIndexRequest.of(idx -> idx.index(SYSTEM_LOG_INDEX_PREFIX_MATCH)));
        Set<String> result = response.result().keySet();
        log.info("查询到的索引:{}", result);
        return result;
    }

}
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐