Github 地址:https://github.com/elastic/elasticsearch-py/blob/master/docs/index.rst

官网地址:https://elasticsearch-py.readthedocs.io/en/latest/index.html

Python-ElasticSearch,python对ES进行写入、更新、删除、搜索:https://blog.csdn.net/u013429010/article/details/81746179

Python3 操作 elasticsearch:https://blog.csdn.net/qq_41262248/article/details/100671930

Elasticsearch 简介

想查数据就免不了搜索,搜索就离不开搜索引擎,百度、谷歌都是一个非常庞大复杂的搜索引擎,他们几乎索引了互联网上开放的所有网页和数据。然而对于我们自己的业务数据来说,肯定就没必要用这么复杂的技术了,如果我们想实现自己的搜索引擎,方便存储和检索,Elasticsearch 就是不二选择,它是一个全文搜索引擎,可以快速地 储存搜索 和 分析 海量数据。

为什么要用 Elasticsearch ?

Elasticsearch 是一个开源的搜索引擎,建立在一个全文搜索引擎库 Apache Lucene™ 基础之上。

那 Lucene 又是什么?Lucene 可能是目前存在的,不论开源还是私有的,拥有最先进,高性能和全功能搜索引擎功能的库,但也仅仅只是一个库。要用上 Lucene,我们需要编写 Java 并引用 Lucene 包才可以,而且我们需要对信息检索有一定程度的理解才能明白 Lucene 是怎么工作的,反正用起来没那么简单。

那么为了解决这个问题,Elasticsearch 就诞生了。Elasticsearch 也是使用 Java 编写的,它的内部使用 Lucene 做索引与搜索,但是它的目标是使全文检索变得简单,相当于 Lucene 的一层封装,它提供了一套简单一致的 RESTful API 来帮助我们实现存储和检索。

所以 Elasticsearch 仅仅就是一个简易版的 Lucene 封装吗?那就大错特错了,Elasticsearch 不仅仅是 Lucene,并且也不仅仅只是一个全文搜索引擎。 它可以被下面这样准确的形容:

  • 一个分布式的实时文档存储,每个字段可以被索引与搜索
  • 一个分布式实时分析搜索引擎
  • 能胜任上百个服务节点的扩展,并支持 PB 级别的结构化或者非结构化数据

总之,是一个相当牛逼的搜索引擎,维基百科、Stack Overflow、GitHub 都纷纷采用它来做搜索。

Elasticsearch 相关概念

Node 和 Cluster

Elasticsearch 本质上是一个分布式数据库,允许多台服务器协同工作,每台服务器可以运行多个 Elasticsearch 实例。

单个 Elasticsearch 实例称为一个节点(Node)。一组节点构成一个集群(Cluster)。

Index

Elasticsearch 会索引所有字段,经过处理后写入一个反向索引(Inverted Index)。查找数据的时候,直接查找该索引。

所以,Elasticsearch 数据管理的顶层单位就叫做 Index(索引),其实就相当于 MySQL、MongoDB 等里面的数据库的概念。
另外值得注意的是,每个 Index (即数据库)的名字必须是小写。

Document

Index 里面单条的记录称为 Document(文档)。许多条 Document 构成了一个 Index。

Document 使用 JSON 格式表示,下面是一个例子。

同一个 Index 里面的 Document,不要求有相同的结构(scheme),但是最好保持相同,这样有利于提高搜索效率。

Type

Document 可以分组,比如 weather 这个 Index 里面,可以按城市分组(北京和上海),也可以按气候分组(晴天和雨天)。
这种分组就叫做 Type,它是虚拟的逻辑分组,用来过滤 Document,类似 MySQL 中的数据表,MongoDB 中的 Collection。

不同的 Type 应该有相似的结构(Schema),举例来说,id 字段不能在这个组是字符串,在另一个组是数值。
这是与关系型数据库的表的一个区别。
性质完全不同的数据(比如 products 和 logs)应该存成两个 Index,而不是一个 Index 里面的两个 Type(虽然可以做到)。

根据规划,Elastic 6.x 版只允许每个 Index 包含一个 Type7.x 版将会彻底移除 Type

Fields

field 即字段,每个 Document 都类似一个 JSON 结构,它包含了许多字段,每个字段都有其对应的值,
多个字段组成了一个 Document,其实就可以类比 MySQL 数据表中的字段。

在 Elasticsearch 中,文档归属于一种类型(Type),而这些类型存在于索引(Index)中,我们可以画一些简单的对比图来类比传统关系型数据库:

关系型数据库    -> Databases -> Tables -> Rows -> Columns
Elasticsearch -> Indices   -> Types  -> Documents -> Fields

以上就是 Elasticsearch 里面的一些基本概念,通过和关系性数据库的对比更加有助于理解。

Python Elasticsearch Client

        elasticsearch-py 是一个正式的低级别的 Elasticsearch 客户端。它的目标是为所有与 elasticsearch 相关的 Python 代码提供公共基础。正因为如此,它试图做到无意见和可扩展。

        对于更高级、范围更有限的客户端库,请查看 elasticsearch-dsl ( https://elasticsearch-dsl.readthedocs.io/en/latest/ ),它是一个位于 elasticsearch-py 之上的更 python 化的库。

兼容性

该库与所有的Elasticsearch版本兼容(从0.90开始)。但你必须使用一个匹配的主要版本:

For Elasticsearch 6.0 and later, use the major version 6 (6.x.y) of the library.

For Elasticsearch 5.0 and later, use the major version 5 (5.x.y) of the library.

For Elasticsearch 2.0 and later, use the major version 2 (2.x.y) of the library, and so on.

The recommended way to set your requirements in your setup.py or requirements.txt is:

# Elasticsearch 6.x
elasticsearch>=6.0.0,<7.0.0

# Elasticsearch 5.x
elasticsearch>=5.0.0,<6.0.0

# Elasticsearch 2.x
elasticsearch>=2.0.0,<3.0.0

安装

使用 pip 安装 elasticsearch  包

pip install elasticsearch
# 豆瓣源
pip install elasticsearch -i https://pypi.doubanio.com/simple/

Python 连接 elasticsearch

Python 连接 elasticsearch 有以下几种连接方式:

from elasticsearch import  Elasticsearch
# es = Elasticsearch()    # 默认连接本地elasticsearch
# es = Elasticsearch(['127.0.0.1:9200'])  # 连接本地9200端口
es = Elasticsearch(
    ["192.168.1.10", "192.168.1.11", "192.168.1.12"], # 连接集群,以列表的形式存放各节点的IP地址
    sniff_on_start=True,    # 连接前测试
    sniff_on_connection_fail=True,  # 节点无响应时刷新节点
    sniff_timeout=60    # 设置超时时间
)

指定连接

es = Elasticsearch(
    ['172.16.153.129:9200'],
    # 认证信息
    # http_auth=('elastic', 'changeme')
)

动态连接

es = Elasticsearch(
    ['esnode1:port', 'esnode2:port'],
    # 在做任何操作之前,先进行嗅探
    sniff_on_start=True,
    # 节点没有响应时,进行刷新,重新连接
    sniff_on_connection_fail=True,
    # 每 60 秒刷新一次
    sniffer_timeout=60
)

对不同的节点,赋予不同的参数

es = Elasticsearch([
    {'host': 'localhost'},
    {'host': 'othernode', 'port': 443, 'url_prefix': 'es', 'use_ssl': True},
])

假如使用了 ssl

es = Elasticsearch(
    ['localhost:443', 'other_host:443'],
    #打开SSL 
    use_ssl=True,
    #确保我们验证了SSL证书(默认关闭)
    verify_certs=True,
    #提供CA证书的路径
    ca_certs='/path/to/CA_certs',
    #PEM格式的SSL客户端证书
    client_cert='/path/to/clientcert.pem',
    #PEM格式的SSL客户端密钥
    client_key='/path/to/clientkey.pem'
)

配置忽略响应状态码

es = Elasticsearch(['127.0.0.1:9200'],ignore=400)  # 忽略返回的400状态码
es = Elasticsearch(['127.0.0.1:9200'],ignore=[400, 405, 502])  # 以列表的形式忽略多个状态码

一个简单的示例

from elasticsearch import  Elasticsearch
es = Elasticsearch()    # 默认连接本地elasticsearch
print(es.index(index='py2', doc_type='doc', id=1, body={'name': "张开", "age": 18}))
print(es.get(index='py2', doc_type='doc', id=1))

第1个 print 为创建 py2 索引,并插入一条数据,第 2个 print 查询指定文档。

查询结果如下:

{'_index': 'py2', '_type': 'doc', '_id': '1', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 0, '_primary_term': 1}
{'_index': 'py2', '_type': 'doc', '_id': '1', '_version': 1, 'found': True, '_source': {'name': '张开', 'age': 18}}

示例 1

from datetime import datetime
from elasticsearch import Elasticsearch
es = Elasticsearch()

doc = {
    'author': 'kimchy',
    'text': 'Elasticsearch: cool. bonsai cool.',
    'timestamp': datetime.now(),
}
res = es.index(index="test-index", doc_type='tweet', id=1, body=doc)
print(res['result'])

res = es.get(index="test-index", doc_type='tweet', id=1)
print(res['_source'])

es.indices.refresh(index="test-index")

res = es.search(index="test-index", body={"query": {"match_all": {}}})
print("Got %d Hits:" % res['hits']['total']['value'])
for hit in res['hits']['hits']:
    print("%(timestamp)s %(author)s: %(text)s" % hit["_source"])

示例 2:

# -*- coding: utf-8 -*-
 
from elasticsearch import Elasticsearch
 
# 默认host为localhost,port为9200.但也可以指定host与port
es = Elasticsearch()
 
# 插入数据,index,doc_type名称可以自定义,id可以根据需求赋值,body为内容
es.index(index="my_index",doc_type="test_type",id=0,body={"name":"python","addr":"深圳"})
es.index(index="my_index",doc_type="test_type",id=1,body={"name":"python","addr":"深圳"})
 
# 同样是插入数据,create() 方法需要我们指定 id 字段来唯一标识该条数据,
# 而 index() 方法则不需要,如果不指定 id,会自动生成一个 id
es.create(index="my_index",doc_type="test_type",id=1,body={"name":"python","addr":"深圳"})
 
#删除指定的index、type、id的文档
es.delete(index='indexName', doc_type='typeName', id=1) 
 
#删除index
es.indices.delete(index='news', ignore=[400, 404])
 
query = {'query': {'match_all': {}}}# 查找所有文档
query1 = {'query': {'match': {'sex': 'famale'}}}# 删除性别为女性的所有文档 
query2 = {'query': {'range': {'age': {'lt': 11}}}}# 删除年龄小于11的所有文档
query3 = {'query': {'term': {'name': 'jack'}}}# 查找名字叫做jack的所有文档
 
 
#删除所有文档
es.delete_by_query(index="my_index",doc_type="test_type",body=query)
 
#get:获取指定index、type、id所对应的文档
es.get(index="my_index",doc_type="test_type",id=1)
 
#search:查询满足条件的所有文档,没有id属性,且index,type和body均可为None
result = es.search(index="my_index",doc_type="test_type",body=query)
print(result['hits']['hits'][0])# 返回第一个文档的内容
 
#update:更新指定index、type、id所对应的文档
#更新的主要点: 
#1. 需要指定 id 
#2. body={"doc": <xxxx>} , 这个doc是必须的
es.update(index="my_index",doc_type="test_type",id=1,body={"doc":{"name":"python1","addr":"深圳1"}})

获取相关信息

测试集群是否启动
In [40]: es.ping()
Out[40]: True

获取集群基本信息
In [39]: es.info()

获取集群的健康状态信息
In [41]: es.cluster.health()

获取当前连接的集群节点信息
In [43]: es.cluster.client.info()

获取集群目前所有的索引
In [55]: print(es.cat.indices())

获取集群的更多信息
es.cluster.stats()

利用实例的 cat 属性得到更简单易读的信息

In [85]: es.cat.health()
Out[85]: '1510431262 04:14:22 sharkyun yellow 1 1 6 6 0 0 6 0 - 50.0%\n'
 
In [86]: es.cat.master()
Out[86]: 'VXgFbKAaTtGO5a1QAfdcLw 172.16.153.129 172.16.153.129 master\n'
 
In [87]: es.cat.nodes()
Out[87]: '172.16.153.129 27 49 0 0.02 0.01 0.00 mdi * master\n'
 
In [88]: es.cat.indices()
 
In [89]: es.cat.count()
Out[89]: '1510431323 04:15:23 301002\n'
 
In [90]: es.cat.plugins()
Out[90]: ''
 
In [91]: es.cat.templates()
Out[91]: 'logstash logstash-* 0 50001\nfilebeat filebeat-* 0 \n'

任务

es.tasks.get()
 
es.tasks.list()

单一操作

查看集群状态

from elasticsearch import Elasticsearch
es=Elasticsearch([{"host":"localhost","port":9200}])
print(es.cluster.state())

查看集群健康度

from elasticsearch import Elasticsearch
es=Elasticsearch([{"host":"localhost","port":9200}])
print(es.cluster.health())

增加一条文档

from elasticsearch import Elasticsearch
es = Elasticsearch([{"host":"localhost","port":9200}])
print(es.cluster.state())
b= {"name": 'lu', 'sex':'female', 'age': 10}
es.index(index='bank', doc_type='typeName',body=b,id=None)
print(es.cluster.state())

create() 方法需要指定 id 字段来唯一标识该条数据。index() 方法则不需要,如果不指定 id,会自动生成一个 id。
create() 方法内部其实也是调用了 index() 方法,是对 index() 方法的封装。

删除一条文档

from elasticsearch import Elasticsearch
es = Elasticsearch([{"host":"localhost","port":9200}])
es.delete(index='bank', doc_type='typeName', id='idValue')

修改一条文档

from elasticsearch import Elasticsearch
es = Elasticsearch([{"host":"localhost","port":9200}])
es.update(index='bank', doc_type='typeName', id='idValue', body={待更新字段})

更新操作利用 index() 方法同样可以做到,index() 方法完成两个操作,如果数据不存在,那就执行插入操作,如果已经存在,那就执行更新操作

查询一条文档

from elasticsearch import Elasticsearch
es = Elasticsearch([{"host":"localhost","port":9200}])
find=es.get(index='bank', doc_type='typeName', id='idValue')
print(find)

批量操作

从json文件中批量添加文档

from elasticsearch import Elasticsearch
es = Elasticsearch([{"host":"localhost","port":9200}])
with open('./accounts.json','r',encoding='utf-8') as file:
    s =file.read()
    print(s)
    es.bulk(index='bank',doc_type='typeName',body=s)

批量操作:

# -*- coding: utf-8 -*-
 
from elasticsearch import Elasticsearch
import os
 
#指定一个文件夹
path = r'C:\Users\Administrator\Desktop\files'
es = Elasticsearch()
doc = []
i = 1
#获取文件夹下所有文件的绝对路径和文件名
for dirname,pathname,filenames in os.walk(path):
    for filename in filenames:
        doc.append({"index":{"_id" : i}})
        doc.append({"filepath":os.path.join(dirname,filename)})
        i = i + 1
es.bulk(index="test",doc_type="text",body=doc)

按条件删除文档

query = {'query': {'match': {'sex': 'famale'}}}# 删除性别为女性的所有文档
 
query = {'query': {'range': {'age': {'lt': 11}}}}# 删除年龄小于51的所有文档
 
es.delete_by_query(index='indexName', body=query, doc_type='typeName')

条件更新

update_by_query:更新满足条件的所有数据,写法同上删除和查询

按条件查询文档

query = {'query': {'match_all': {}}}# 查找所有文档
 
query = {'query': {'term': {'name': 'jack'}}}# 查找名字叫做jack的所有文档
 
query = {'query': {'range': {'age': {'gt': 11}}}}# 查找年龄大于11的所有文档
 
allDoc = es.search(index='indexName', doc_type='typeName', body=query)
 
print allDoc['hits']['hits'][0]# 返回第一个文档的内容

#批量写入、删除、更新

doc = [
     {"index": {}},
     {'name': 'jackaaa', 'age': 2000, 'sex': 'female', 'address': u'北京'},
     {"index": {}},
     {'name': 'jackbbb', 'age': 3000, 'sex': 'male', 'address': u'上海'},
     {"index": {}},
     {'name': 'jackccc', 'age': 4000, 'sex': 'female', 'address': u'广州'},
     {"index": {}},
     {'name': 'jackddd', 'age': 1000, 'sex': 'male', 'address': u'深圳'},
 ]
 doc = [
    {'index': {'_index': 'indexName', '_type': 'typeName', '_id': 'idValue'}}
    {'name': 'jack', 'sex': 'male', 'age': 10 }
    {'delete': {'_index': 'indexName', '_type': 'typeName', '_id': 'idValue'}}
    {"create": {'_index' : 'indexName', "_type" : 'typeName', '_id': 'idValue'}}
    {'name': 'lucy', 'sex': 'female', 'age': 20 }
    {'update': {'_index': 'indexName', '_type': 'typeName', '_id': 'idValue'}}
    {'doc': {'age': '100'}}
 ]
 es.bulk(index='indexName',  doc_type='typeName', body=doc)
 
 #批量更新也可以采用如下的方式进行json拼装,最后写入
 for line in list:
            action = {
                "_index": self.index_name,
                "_type": self.index_type,
                "_id": i, #_id 也可以默认生成,不赋值
                "_source": {
                    "date": line['date'],
                    "source": line['source'].decode('utf8'),
                    "link": line['link'],
                    "keyword": line['keyword'].decode('utf8'),
                    "title": line['title'].decode('utf8')}
            }
            i += 1
            ACTIONS.append(action)
success, _ = bulk(self.es, ACTIONS, index=self.index_name, raise_on_error=True)

Python Elasticsearch Client 还提供了很多功能。

参考文档:https://elasticsearch-py.readthedocs.io/en/master/api.htmlhttps://www.elastic.co/guide/en/elasticsearch/reference/current/docs.html

搜索所有数据

es.search(index="my_index",doc_type="test_type")
# 或者
body = {
    "query":{
        "match_all":{}
    }
}
es.search(index="my_index",doc_type="test_type",body=body)

body = {
    "query":{
        "term":{
            "name":"python"
        }
    }
}
# 查询name="python"的所有数据
es.search(index="my_index",doc_type="test_type",body=body)

body = {
    "query":{
        "terms":{
            "name":[
                "python","android"
            ]
        }
    }
}
# 搜索出name="python"或name="android"的所有数据
es.search(index="my_index",doc_type="test_type",body=body)

#match 与 multi_match

# match:匹配name包含python关键字的数据
body = {
    "query":{
        "match":{
            "name":"python"
        }
    }
}
# 查询name包含python关键字的数据
es.search(index="my_index",doc_type="test_type",body=body)

# multi_match:在name和addr里匹配包含深圳关键字的数据
body = {
    "query":{
        "multi_match":{
            "query":"深圳",
            "fields":["name","addr"]
        }
    }
}
# 查询name和addr包含"深圳"关键字的数据
es.search(index="my_index",doc_type="test_type",body=body)

#ids

body = {
    "query":{
        "ids":{
            "type":"test_type",
            "values":[
                "1","2"
            ]
        }
    }
}
# 搜索出id为1或2d的所有数据
es.search(index="my_index",doc_type="test_type",body=body)

#复合查询 bool

bool有3类查询关系,must(都满足),should(其中一个满足),must_not(都不满足)

body = {
    "query":{
        "bool":{
            "must":[
                {
                    "term":{
                        "name":"python"
                    }
                },
                {
                    "term":{
                        "age":18
                    }
                }
            ]
        }
    }
}
# 获取name="python"并且age=18的所有数据
es.search(index="my_index",doc_type="test_type",body=body)

#切片式查询

body = {
    "query":{
        "match_all":{}
    }
    "from":2    # 从第二条数据开始
    "size":4    # 获取4条数据
}
# 从第2条数据开始,获取4条数据
es.search(index="my_index",doc_type="test_type",body=body)

#范围查询

body = {
    "query":{
        "range":{
            "age":{
                "gte":18,       # >=18
                "lte":30        # <=30
            }
        }
    }
}
# 查询18<=age<=30的所有数据
es.search(index="my_index",doc_type="test_type",body=body)

#前缀查询

body = {
    "query":{
        "prefix":{
            "name":"p"
        }
    }
}
# 查询前缀为"赵"的所有数据
es.search(index="my_index",doc_type="test_type",body=body)

#通配符查询

body = {
    "query":{
        "wildcard":{
            "name":"*id"
        }
    }
}
# 查询name以id为后缀的所有数据
es.search(index="my_index",doc_type="test_type",body=body)

#排序

body = {
    "query":{
        "match_all":{}
    }
    "sort":{
        "age":{                 # 根据age字段升序排序
            "order":"asc"       # asc升序,desc降序
        }
    }
}

#filter_path
响应过滤

# 只需要获取_id数据,多个条件用逗号隔开
es.search(index="my_index",doc_type="test_type",filter_path=["hits.hits._id"])

# 获取所有数据
es.search(index="my_index",doc_type="test_type",filter_path=["hits.hits._*"])

#count
执行查询并获取该查询的匹配数

# 获取数据量
es.count(index="my_index",doc_type="test_type")

#度量类聚合
获取最小值

body = {
    "query":{
        "match_all":{}
    },
    "aggs":{                        # 聚合查询
        "min_age":{                 # 最小值的key
            "min":{                 # 最小
                "field":"age"       # 查询"age"的最小值
            }
        }
    }
}
# 搜索所有数据,并获取age最小的值
es.search(index="my_index",doc_type="test_type",body=body)

获取最大值

body = {
    "query":{
        "match_all":{}
    },
    "aggs":{                        # 聚合查询
        "max_age":{                 # 最大值的key
            "max":{                 # 最大
                "field":"age"       # 查询"age"的最大值
            }
        }
    }
}
# 搜索所有数据,并获取age最大的值
es.search(index="my_index",doc_type="test_type",body=body)

获取和

body = {
    "query":{
        "match_all":{}
    },
    "aggs":{                        # 聚合查询
        "sum_age":{                 # 和的key
            "sum":{                 # 和
                "field":"age"       # 获取所有age的和
            }
        }
    }
}
# 搜索所有数据,并获取所有age的和
es.search(index="my_index",doc_type="test_type",body=body)

获取平均值

body = {
    "query":{
        "match_all":{}
    },
    "aggs":{                        # 聚合查询
        "avg_age":{                 # 平均值的key
            "sum":{                 # 平均值
                "field":"age"       # 获取所有age的平均值
            }
        }
    }
}
# 搜索所有数据,获取所有age的平均值
es.search(index="my_index",doc_type="test_type",body=body)

更多的搜索用法:https://elasticsearch-py.readthedocs.io/en/master/api.html

查询数据

上面的几个操作都是非常简单的操作,普通的数据库如 MongoDB 都是可以完成的,看起来并没有什么了不起的,Elasticsearch 更特殊的地方在于其异常强大的检索功能。

对于中文来说,我们需要安装一个分词插件,这里使用的是 elasticsearch-analysis-ik,GitHub 链接为:https://github.com/medcl/elasticsearch-analysis-ik,这里我们使用 Elasticsearch 的另一个命令行工具 elasticsearch-plugin 来安装,这里安装的版本是 6.2.4,请确保和 Elasticsearch 的版本对应起来,命令如下:

elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.2.4/elasticsearch-analysis-ik-6.2.4.zip

这里的版本号请替换成你的 Elasticsearch 的版本号。安装之后重新启动 Elasticsearch 就可以了,它会自动加载安装好的插件。首先我们新建一个索引并指定需要分词的字段,代码如下:

from elasticsearch import Elasticsearch
 
es = Elasticsearch()
mapping = {
    'properties': {
        'title': {
            'type': 'text',
            'analyzer': 'ik_max_word',
            'search_analyzer': 'ik_max_word'
        }
    }
}
es.indices.delete(index='news', ignore=[400, 404])
es.indices.create(index='news', ignore=400)
result = es.indices.put_mapping(index='news', doc_type='politics', body=mapping)
print(result)

这里我们先将之前的索引删除了,然后新建了一个索引,然后更新了它的 mapping 信息,mapping 信息中指定了分词的字段,指定了字段的类型 type 为 text,分词器 analyzer 和 搜索分词器 search_analyzer 为 ik_max_word,即使用我们刚才安装的中文分词插件。如果不指定的话则使用默认的英文分词器。

接下来我们插入几条新的数据:

datas = [
    {
        'title': '美国留给伊拉克的是个烂摊子吗',
        'url': 'http://view.news.qq.com/zt2011/usa_iraq/index.htm',
        'date': '2011-12-16'
    },
    {
        'title': '公安部:各地校车将享最高路权',
        'url': 'http://www.chinanews.com/gn/2011/12-16/3536077.shtml',
        'date': '2011-12-16'
    },
    {
        'title': '中韩渔警冲突调查:韩警平均每天扣1艘中国渔船',
        'url': 'https://news.qq.com/a/20111216/001044.htm',
        'date': '2011-12-17'
    },
    {
        'title': '中国驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首',
        'url': 'http://news.ifeng.com/world/detail_2011_12/16/11372558_0.shtml',
        'date': '2011-12-18'
    }
]
 
for data in datas:
    es.index(index='news', doc_type='politics', body=data)

这里我们指定了四条数据,都带有 title、url、date 字段,然后通过 index() 方法将其插入 Elasticsearch 中,索引名称为 news,类型为 politics。

接下来我们根据关键词查询一下相关内容:

result = es.search(index='news', doc_type='politics')
print(result)

可以看到查询出了所有插入的四条数据:

{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 4,
    "max_score": 1.0,
    "hits": [
      {
        "_index": "news",
        "_type": "politics",
        "_id": "c05G9mQBD9BuE5fdHOUT",
        "_score": 1.0,
        "_source": {
          "title": "美国留给伊拉克的是个烂摊子吗",
          "url": "http://view.news.qq.com/zt2011/usa_iraq/index.htm",
          "date": "2011-12-16"
        }
      },
      {
        "_index": "news",
        "_type": "politics",
        "_id": "dk5G9mQBD9BuE5fdHOUm",
        "_score": 1.0,
        "_source": {
          "title": "中国驻洛杉矶领事馆遭亚裔男子枪击,嫌犯已自首",
          "url": "http://news.ifeng.com/world/detail_2011_12/16/11372558_0.shtml",
          "date": "2011-12-18"
        }
      },
      {
        "_index": "news",
        "_type": "politics",
        "_id": "dU5G9mQBD9BuE5fdHOUj",
        "_score": 1.0,
        "_source": {
          "title": "中韩渔警冲突调查:韩警平均每天扣1艘中国渔船",
          "url": "https://news.qq.com/a/20111216/001044.htm",
          "date": "2011-12-17"
        }
      },
      {
        "_index": "news",
        "_type": "politics",
        "_id": "dE5G9mQBD9BuE5fdHOUf",
        "_score": 1.0,
        "_source": {
          "title": "公安部:各地校车将享最高路权",
          "url": "http://www.chinanews.com/gn/2011/12-16/3536077.shtml",
          "date": "2011-12-16"
        }
      }
    ]
  }
}

可以看到返回结果会出现在 hits 字段里面,然后其中有 total 字段标明了查询的结果条目数,还有 max_score 代表了最大匹配分数。

另外我们还可以进行全文检索,这才是体现 Elasticsearch 搜索引擎特性的地方:

dsl = {
    'query': {
        'match': {
            'title': '中国 领事馆'
        }
    }
}
 
es = Elasticsearch()
result = es.search(index='news', doc_type='politics', body=dsl)
print(json.dumps(result, indent=2, ensure_ascii=False))

这里我们使用 Elasticsearch 支持的 DSL 语句来进行查询,使用 match 指定全文检索,检索的字段是 title,内容是“中国领事馆”,搜索结果如下:

{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 2,
    "max_score": 2.546152,
    "hits": [
      {
        "_index": "news",
        "_type": "politics",
        "_id": "dk5G9mQBD9BuE5fdHOUm",
        "_score": 2.546152,
        "_source": {
          "title": "中国驻洛杉矶领事馆遭亚裔男子枪击,嫌犯已自首",
          "url": "http://news.ifeng.com/world/detail_2011_12/16/11372558_0.shtml",
          "date": "2011-12-18"
        }
      },
      {
        "_index": "news",
        "_type": "politics",
        "_id": "dU5G9mQBD9BuE5fdHOUj",
        "_score": 0.2876821,
        "_source": {
          "title": "中韩渔警冲突调查:韩警平均每天扣1艘中国渔船",
          "url": "https://news.qq.com/a/20111216/001044.htm",
          "date": "2011-12-17"
        }
      }
    ]
  }
}

这里我们看到匹配的结果有两条,第一条的分数为 2.54,第二条的分数为 0.28,这是因为第一条匹配的数据中含有“中国”和“领事馆”两个词,第二条匹配的数据中不包含“领事馆”,但是包含了“中国”这个词,所以也被检索出来了,但是分数比较低。

因此可以看出,检索时会对对应的字段全文检索,结果还会按照检索关键词的相关性进行排序,这就是一个基本的搜索引擎雏形。

另外 Elasticsearch 还支持非常多的查询方式,详情可以参考官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/6.3/query-dsl.html

特点

该客户端被设计为Elasticsearch的REST API的非常薄的包装,以实现最大的灵活性。这意味着这个客户没有意见;这也意味着从Python中使用一些api有点麻烦。我们创建了一些帮助程序来帮助解决这个问题,并在此基础上创建了一个更高级的库( Elasticsearch -dsl :https://elasticsearch-dsl.readthedocs.io/en/latest/ ) 来提供使用 Elasticsearch 的更方便的方法。

持久化连接

elasticsearch-py使用各个连接池中的持久连接(每个配置或嗅探节点一个)。您可以在两个http协议实现之间进行选择。有关更多信息,请参见传输类  Transport classes 。

elasticsearch-py uses persistent connections inside of individual connection pools (one per each configured or sniffed node). Out of the box you can choose between two http protocol implementations. See Transport classes for more information.

The transport layer will create an instance of the selected connection class per node and keep track of the health of individual nodes - if a node becomes unresponsive (throwing exceptions while connecting to it) it’s put on a timeout by the ConnectionPool class and only returned to the circulation after the timeout is over (or when no live nodes are left). By default nodes are randomized before being passed into the pool and round-robin strategy is used for load balancing.

You can customize this behavior by passing parameters to the Connection Layer API (all keyword arguments to the Elasticsearch class will be passed through). If what you want to accomplish is not supported you should be able to create a subclass of the relevant component and pass it in as a parameter to be used instead of the default implementation.

Automatic Retries (自动重试)

If a connection to a node fails due to connection issues (raises ConnectionError) it is considered in faulty state. It will be placed on hold for dead_timeout seconds and the request will be retried on another node. If a connection fails multiple times in a row the timeout will get progressively larger to avoid hitting a node that’s, by all indication, down. If no live connection is available, the connection that has the smallest timeout will be used.

By default retries are not triggered by a timeout (ConnectionTimeout), set retry_on_timeout to True to also retry on timeouts.

Sniffing(嗅探)

The client can be configured to inspect the cluster state to get a list of nodes upon startup, periodically and/or on failure. See Transport parameters for details.

Some example configurations:

from elasticsearch import Elasticsearch

# by default we don't sniff, ever
es = Elasticsearch()

# you can specify to sniff on startup to inspect the cluster and load
# balance across all nodes
es = Elasticsearch(["seed1", "seed2"], sniff_on_start=True)

# you can also sniff periodically and/or after failure:
es = Elasticsearch(["seed1", "seed2"],
          sniff_on_start=True,
          sniff_on_connection_fail=True,
          sniffer_timeout=60)

Thread safety (线程安全)

The client is thread safe and can be used in a multi threaded environment. Best practice is to create a single global instance of the client and use it throughout your application. If your application is long-running consider turning on Sniffing to make sure the client is up to date on the cluster location.

By default we allow urllib3 to open up to 10 connections to each node, if your application calls for more parallelism, use the maxsize parameter to raise the limit:

# allow up to 25 connections to each node
es = Elasticsearch(["host1", "host2"], maxsize=25)

注意 :Since we use persistent connections throughout the client it means that the client doesn’t tolerate fork very well. If your application calls for multiple processes make sure you create a fresh client after call to fork. Note that Python’s multiprocessing module uses fork to create new processes on POSIX systems.

SSL and Authentication

You can configure the client to use SSL for connecting to your elasticsearch cluster, including certificate verification and HTTP auth:

from elasticsearch import Elasticsearch

# you can use RFC-1738 to specify the url
es = Elasticsearch(['https://user:secret@localhost:443'])

# ... or specify common parameters as kwargs

es = Elasticsearch(
    ['localhost', 'otherhost'],
    http_auth=('user', 'secret'),
    scheme="https",
    port=443,
)

# SSL client authentication using client_cert and client_key

from ssl import create_default_context

context = create_default_context(cafile="path/to/cert.pem")
es = Elasticsearch(
    ['localhost', 'otherhost'],
    http_auth=('user', 'secret'),
    scheme="https",
    port=443,
    ssl_context=context,
)

警告:elasticsearch-py doesn’t ship with default set of root certificates. To have working SSL certificate validation you need to either specify your own as cafile or capath or cadata or install certifi which will be picked up automatically.

See class Urllib3HttpConnection for detailed description of the options.

Logging (日志)

elasticsearch-py uses the standard logging library from python to define two loggers: elasticsearch and elasticsearch.traceelasticsearch is used by the client to log standard activity, depending on the log level. elasticsearch.trace can be used to log requests to the server in the form of curl commands using pretty-printed json that can then be executed from command line. Because it is designed to be shared (for example to demonstrate an issue) it also just uses localhost:9200 as the address instead of the actual address of the host. If the trace logger has not been configured already it is set to propagate=False so it needs to be activated separately.

Environment considerations (环境注意事项)

When using the client there are several limitations of your environment that could come into play.

When using an HTTP load balancer you cannot use the Sniffing functionality - the cluster would supply the client with IP addresses to directly connect to the cluster, circumventing the load balancer. Depending on your configuration this might be something you don’t want or break completely.

In some environments (notably on Google App Engine) your HTTP requests might be restricted so that GET requests won’t accept body. In that case use the send_get_body_as parameter of Transport to send all bodies via post:

from elasticsearch import Elasticsearch
es = Elasticsearch(send_get_body_as='POST')

Compression (压缩)

When using capacity-constrained networks (low throughput), it may be handy to enable compression. This is especially useful when doing bulk loads or inserting large documents. This will configure compression.

from elasticsearch import Elasticsearch
es = Elasticsearch(hosts, http_compress=True)

Running on AWS with IAM

If you want to use this client with IAM based authentication on AWS you can use the requests-aws4auth package:

from elasticsearch import Elasticsearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth

host = 'YOURHOST.us-east-1.es.amazonaws.com'
awsauth = AWS4Auth(YOUR_ACCESS_KEY, YOUR_SECRET_KEY, REGION, 'es')

es = Elasticsearch(
    hosts=[{'host': host, 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection
)
print(es.info())

Customization (定制)

By default, JSONSerializer is used to encode all outgoing requests. However, you can implement your own custom serializer:

from elasticsearch.serializer import JSONSerializer

class SetEncoder(JSONSerializer):
    def default(self, obj):
        if isinstance(obj, set):
            return list(obj)
        if isinstance(obj, Something):
            return 'CustomSomethingRepresentation'
        return JSONSerializer.default(self, obj)

es = Elasticsearch(serializer=SetEncoder())

Contents

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐