官网:https://milvus.io/
安装文档:https://milvus.io/cn/docs/v1.1.1/milvus_docker-cpu.md
可视化:https://zilliz.com/products/em

CURD API文档来表示 增加:Create 修改:Update 查找:Read 删除:Delete 外

代码文档api:https://milvus.io/cn/api-reference/pymilvus/v1.1.2/api.html
函数说明:https://milvus.io/cn/docs/v1.1.1/insert_delete_vector_python.md

在调用 delete 接口后,用户可以选择再调用 flush,保证新增的数据可见,被删除的数据不会再被搜到。

# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from milvus import *
from config import MILVUS_HOST, MILVUS_PORT, collection_param, index_type, index_param, top_k, search_param


class VecToMilvus():
    def __init__(self):
        self.client = Milvus(host=MILVUS_HOST, port=MILVUS_PORT)

    def has_collection(self, collection_name):
        try:
            status, ok = self.client.has_collection(collection_name)
            return ok
        except Exception as e:
            print("Milvus has_table error:", e)

    def creat_collection(self, collection_name):
        try:
            collection_param['collection_name'] = collection_name
            status = self.client.create_collection(collection_param)
            print(status)
            return status
        except Exception as e:
            print("Milvus create collection error:", e)

    def create_index(self, collection_name):
        try:
            status = self.client.create_index(collection_name, index_type,
                                              index_param)
            print(status)
            return status
        except Exception as e:
            print("Milvus create index error:", e)

    def has_partition(self, collection_name, partition_tag):
        try:
            status, ok = self.client.has_partition(collection_name,
                                                   partition_tag)
            return ok
        except Exception as e:
            print("Milvus has partition error: ", e)

    def create_partition(self, collection_name, partition_tag):
        try:
            status = self.client.create_partition(collection_name,
                                                  partition_tag)
            print('create partition {} successfully'.format(partition_tag))
            return status
        except Exception as e:
            print('Milvus create partition error: ', e)

    def insert_ini(self, vectors, collection_name, ids=None, partition_tag=None):
        try:
            self.client.drop_collection(collection_name)
            self.client.drop_index(collection_name)
            self.creat_collection(collection_name)
            self.create_index(collection_name)
            print('collection info: {}'.format(self.client.get_collection_info(collection_name)[1]))
            self.create_partition(collection_name, partition_tag)
            status, ids = self.client.insert(
                collection_name=collection_name,
                records=vectors,
                ids=ids,
                partition_tag=partition_tag)
            self.client.flush([collection_name])
            print(
                'Insert {} entities, there are {} entities after insert data.'.
                format(
                    len(ids), self.client.count_entities(collection_name)[1]))
            return status, ids
        except Exception as e:
            print("Milvus insert error:", e)

    def insert(self, vectors, collection_name, ids=None, partition_tag=None):
        try:
            if not self.has_collection(collection_name):
                self.creat_collection(collection_name)
                self.create_index(collection_name)
                print('collection info: {}'.format(
                    self.client.get_collection_info(collection_name)[1]))
            if (partition_tag is not None) and (
                    not self.has_partition(collection_name, partition_tag)):
                self.create_partition(collection_name, partition_tag)

            status, result = self.find_id(ids, collection_name, partition_tag)
            if result == [[]] or result == []:
                status, ids = self.client.insert(
                    collection_name=collection_name,
                    records=vectors,
                    ids=ids,
                    partition_tag=partition_tag)
                self.client.flush([collection_name])
                print(
                    'Insert {} entities, there are {} entities after insert data.'.
                    format(
                        len(ids), self.client.count_entities(collection_name)[1]))
                return status, ids
            print('Insert entities already exist')
            return status, ids
        except Exception as e:
            print("Milvus insert error:", e)

    def search(self, vectors, collection_name, partition_tag=None):
        try:
            status, results = self.client.search(
                collection_name=collection_name,
                query_records=vectors,
                top_k=top_k,
                params=search_param,
                partition_tag=partition_tag)
            # print(status)
            return status, results
        except Exception as e:
            print('Milvus recall error: ', e)

    def delete(self, id_array, collection_name, partition_tag=None):
        try:
            status = self.client.delete_entity_by_id(collection_name=collection_name,
                                                     partition_tag=partition_tag,
                                                     id_array=id_array)
            self.client.flush([collection_name])
            self.client.compact(collection_name)
            return status
        except Exception as e:
            print('Milvus delete error: ', e)

    def find_id(self, ids, collection_name, partition_tag=None):
        try:
            status, result = self.client.get_entity_by_id(collection_name=collection_name,
                                                          partition_tag=partition_tag,
                                                          ids=ids)

            return status, result
        except Exception as e:
            print('Milvus find_id error: ', e)


if __name__ == '__main__':
    import random

    client = VecToMilvus()
    collection_name = 'test1'
    partition_tag = 'partition_1'
    ids = [random.randint(0, 1000) for _ in range(100)]
    embeddings = [[random.random() for _ in range(128)] for _ in range(100)]
    status, ids = client.insert(
        collection_name=collection_name,
        vectors=embeddings,
        ids=ids,
        partition_tag=partition_tag)
    print(status)
    print(ids)

元数据,不存储真实数据,存储数据表的描述

不管是分区还是段,都只是数据在物理存储中的组织形式。Milvus 进行查询操作时,必须要获知各个数据文件在物理存储上的位置以及状态信息,包括所属集合、包含的实体条数、文件的大小、全局唯一的标识、以及创建日期等等。我们将这些信息称为元数据。此外,元数据还包含集合以及分区的信息,包括集合名称、集合维度、索引类型、分区标签等等。

当数据发生改变时,元数据应相应变化并且易于获取,因此使用事务型数据库来管理元数据是一个理想的选择。Milvus 提供 SQLite 或者 MySQL 作为元数据的后端服务。对于生产环境或者分布式服务来说,应当使用 MySQL 来作为元数据后端服务。

元数据后端服务不负责存储实体数据和索引。

在这里插入图片描述

创建集合时 index_file_size 如何设置能达到性能最优?

使用客户端创建集合时有一个 index_file_size 参数,用来指定数据存储时单个文件的大小,其单位为 MB,默认值为 1024。当向量数据不断导入时,Milvus 会把数据增量式地合并成文件。当某个文件达到 index_file_size 所设置的值之后,这个文件就不再接受新的数据,Milvus 会把新的数据存成另外一个文件。这些都是原始向量数据文件,如果建立了索引,则每个原始文件会对应生成一个索引文件。Milvus 在进行搜索时,是依次对每个索引文件进行搜索。

根据我们的经验,当 index_file_size 从 1024 改为 2048 时,搜索性能会有 30% ~ 50% 左右的提升。但要注意如果该值设得过大,有可能导致大文件无法加载进显存(甚至内存)。比如显存只有 2 GB,该参数设为 3 GB,显存明显放不下。

如果向集合中导入数据的频率不高,建议将 index_file_size 的值设为 1024 MB 或者 2048 MB。如果后续会持续地向集合中导入增量数据,为了避免查询时未建立索引的数据文件过大,建议这种情况下将该值设置为 256 MB 或者 512 MB。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐