10

Python连接es笔记三之es更新操作 - XHunter

 1 year ago
source link: https://www.cnblogs.com/hunterxiong/p/17444265.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

本文首发于公众号:Hunter后端
原文链接:Python连接es笔记三之es更新操作

这一篇笔记介绍如何使用 Python 对数据进行更新操作。

对于 es 的更新的操作,不用到 Search() 方法,而是直接使用 es 的连接加上相应的函数来操作,本篇笔记目录如下:

  1. update()
  2. update_by_query()
  3. UpdateByQuery()

1、获取连接

如果使用的是之前的全局创建连接的方式:

from elasticsearch_dsl import connections
connections.configure(
    default={"hosts": "localhost:9200"},
)

我们可以根据别名获取相应的连接:

conn = connections.connections.get_connection("default")

或者我们直接使用 elasticsearch.Elasticsearch 模块来重新建立一个连接:

from elasticsearch import Elasticsearch

conn = Elasticsearch(hosts="localhost:9200")

前面介绍过,我们安装 elasticsearch_dsl 依赖的时候,会自动为我们安装上相应的 elasticsearch 模块,我们这里直接使用即可。

然后通过 conn 连接可以直接对数据进行更新,可用的方法有 update(),update_by_query() 以及一个批量的 bulk() 方法。

2、update()

update() 函数一般只用于指定 id 的更新操作,如果我们知道一条数据的 id,我们可以直接使用 update()。

比如对于 exam 这个 index 下 id=18 的数据,我们想要更新它的 name 字段和 address 字段分别为 王五和湖南省,我们可以如下操作:

conn.update(
    index="exam",
    id=18,
    body={
        "doc": {
            "name": "王五2",
            "address": "湖南省",
        }
    }
)

在上面的操作中,index 为指定的索引,id 参数为我们需要更新的 id,body 内 doc 下的字段即为我们要更新的数据。

3、update_by_query()

update_by_query() 函数不局限于 id 的查询更新,我们可以更新任意符合条件的数据,以下是一个简单的示例:

conn.update_by_query(
    index="exam",
    body={
        "query": {
            "term": {"name":  "张三丰"}
        },
        "script": {
            "source": "ctx._source.address = params.address",
            "params": {
                "address": "新地址",
            }
        }
    }
)

在这里,index 参数还是指向对应的索引,body 内包含了需要更新查询的条件,这里都在 query 参数内,需要更新的数据在 script 下,通过脚本的形式来操作更新。

这里注意下,我这里用到的是 7.6.0 版本,所以 script 下使用的 source,更低一点版本用的字段可能是 inline,这里使用对应版本的参数即可。

在 script.source 中,内容为 ctx._source.address = params.address,意思是将符合条件数据的 address 字段内容更新为 params 的 address 的数据。

如果想要更改其他字段内容,注意前面 ctx._source 为固定写法,只需要更改后面的字段名即可。

在 script.params 中,我们则可以定义各种对应的字段及其内容。

更新多个字段

如果我们想同时更新多个字段,比如说符合条件的数据将 address 改为 新地址,将 age 字段改为 28,我们则需要将多个条件在 script.source 中使用分号 ; 连接起来,示例如下:

conn.update_by_query(
    index="exam",
    body={
        "query": {
            "term": {"name":  "新张三丰2"}
        },
        "script": {
            "source": "ctx._source.address = params.address; ctx._source.age = params.age",
            "params": {
                "address": "新地址3",
                "age": "28"
            }
        }
    }
)

虽然这里更新多个字段需要使用分号连接,但是在实际的代码中我们不用这么写死,比如说我们需要更改三个字段,为 ["address", "name", "age"],我们如下操作:

field_list = ["address", "name", "age"]
source_list = [f"ctx._source.{key}=params.{key}" for key in field_list]

params = {
    "address": "新地址3",
    "age": "28",
    "name": "new name"
}

conn.update_by_query(
    index="exam",
    body={
        "query": {
            "term": {"name":  "新张三丰3"}
        },
        "script": {
            "source": ";".join(source_list),
            "params": params
        }
    }
)

4、批量更新

如果我们想批量更新一批数据,这批数据各个字段的值都不一致,自定义的程度很大,使用 update_by_query() 函数已经不现实了,怎么办?

好解决,我们可以使用 helpers.bulk() 批量更新方法。

首先引入这个模块:

from elasticsearch import helpers

假设我们系统里现在有 id 为 21,23,24 的几条数据,还是在 exam 这个索引下,我们来构造几条需要更新的数据来操作:

action_1 = {
    "_op_type": "update",
    "_index": "exam",
    "_id": 21,
    "doc": {"age": 19, "name": "令狐冲", "address": "华山派"},
}

action_2 = {
    "_op_type": "update",
    "_index": "exam",
    "_id": 23,
    "doc": {"age": 20, "name": "杨过", "address": "终南山"},
}

action_3 = {
    "_op_type": "update",
    "_index": "exam",
    "_id": 24,
    "doc": {"age": 21, "name": "张无忌", "address": "武当"},
}
action_list = [action_1, action_2, action_3]
helpers.bulk(conn, actions=action_list)

对于每一条需要更新的数据,有这几个参数:

_op_type:如果是更新操作,其值则是 update

_index:表示需要更新的数据所在的索引,这里是 exam

_id:表示这条需要更新的数据的 id

doc:是一个 dict 数据,其下包含了需要更新的字段及其对应的值

至此,一条需要更新的数据的结构就构造完毕了。

然后对于 helpers.bulk() 函数,接收的第一个参数为 es 连接,actions 参数是一个列表,其内容就是我们前面构造的数据的集合。

然后执行这个操作就可以发现 es 中对应的值已经更改了。

5、UpdateByQuery()

UpdateByQuery() 函数来源于 elasticsearch_dsl 模块,它的使用和 Search() 方法差不多,都是通过 using 和 index 参数来获取 es 连接和索引:

from elasticsearch_dsl import connections
from elasticsearch_dsl import UpdateByQuery
from elasticsearch_dsl import Q as ES_Q

connections.configure(
    default={"hosts": "localhost:9200"},
)


ubq = UpdateByQuery(using="default", index="exam")

使用这个方法更新数据的具体语法和 update_by_query 差不多,都是通过 script 的方式来操作,以下是一个简单示例:

ubq = UpdateByQuery(using="default", index="exam")

q1 = ES_Q("term", name="郭靖")

ubq = ubq.query(q1)

ubq = ubq.script(
    source="ctx._source.address=params.address",
    params={
        "address": "襄阳城"
    }
)

ubq.execute()

与 Search() 函数一样,都需要通过 execute() 函数来向 es 提交数据。

如果想获取更多后端相关文章,可扫码关注阅读:

image

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK