3

常用Py3操作Elasticsearch方法

 2 years ago
source link: https://blog.51cto.com/u_13717475/5354107
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

常用Py3操作Elasticsearch方法

原创

以下内容是通过Python来操作Es的代码操作示例

Demo示例

from elasticsearch import Elasticsearch

# 设置连接Es
es_client = Elasticsearch([{"host": "IP", "port": 9200}])
# 查看Es状态
print(es_client.cat.health())
# Output: 1654140714 11:31:54 TestEs yellow 1 1 620 620 0 0 620 0 - 50.0%

# 查看Es有哪些索引
print(es_client.cat.indices())
# Output:
yellow open user_info tJ_1DHJ7TjWdgZz9FSIvkg 1 1      9    1 183.6kb 183.6kb
yellow open test      KkmiWv_yQ265u7qbg2Z0FA 5 1   1851    0     3mb     3mb

# 查看Es节点有哪些
print(es_client.cat.nodes())
# Output: 192.168.5.112 56 59 0 0.00 0.01 0.05 mdi * node-1

# 查看Es主节点
print(es_client.cat.master())
# Output: UTKLTopJQv-qC1CUPA2aSg 192.168.5.112 192.168.5.112 node-1

# 删除索引
print(es_client.indices.delete(index="test"))
# Output: {'acknowledged': True}

# 创建索引
body = {
    "mappings": {
        "_default_": {
            "properties": {
                "id": {
                    "type": "integer"
                },
                "name": {
                    "type": "text"
                }
            }
        }
    }
}
# 通过设置mapping来新建索引,当然body不指定的话默认使用的是Es自动推导的类型
print(es_client.indices.create(index="test", body=body))
# Output: {'acknowledged': True, 'shards_acknowledged': True}

# 重建索引
body = {"query": {"match_all": {}}}  #遍历原索引,可自定义query
print(helpers.reindex(client=es_client, source_index="test", target_index="new_test", target_client=es_client,
                      query=body))
# Output: (10, 10)
# 单条新增
print(es_client.index(index="test",
                      doc_type="user_info",
                      id='1',
                      body={"name": "张三", "age": 35, "isChina": True, "hobby": ["唱歌", "跳舞"]}))
# Output: {'_index': 'test', '_type': 'user_info', '_id': '1', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, 'created': True}

# 批量新增
action = [{
            "_index": "test",
            "_type": "user_info",
            "_id": "1",
            "_source": {
                'name': "李四",
                'age': 35,
                'isChina': True,
                "hobby": ["唱歌", "跳舞"]
            }
        }]
bulk_insert_res = helpers.bulk(client=es_client, actions=action, stats_only=True)
print(bulk_insert_res)
# Output: (1, 0)
# 单条删除
print(es_client.delete(index='test', doc_type='user_info', id=1))
# Output: {'found': True, '_index': 'test', '_type': 'user_info', '_id': '1', '_version': 3, 'result': 'deleted', '_shards': {'total': 2, 'successful': 1, 'failed': 0}}


# 删除指定条件数据, 批量删除
query = {
    "query": {
        "term": {"age": 35}
    }
}
print(es_client.delete_by_query(index='test', doc_type='user_info', body=query))
# Output: {'took': 0, 'timed_out': False, 'total': 0, 'deleted': 1, 'batches': 0, 'version_conflicts': 0, 'noops': 0, 'retries': {'bulk': 0, 'search': 0}, 'throttled_millis': 0, 'requests_per_second': -1.0, 'throttled_until_millis': 0, 'failures': []}

ES不支持更新操作,具体更新操作底层实现的原理是: 删除原来索引的数据,插入新索引的数据。每一次更新,Es的_version字段内容会递增

# 单条更新(注意:这里的更新对象中,需要有doc关键字作为key)
body = {
    "doc": {"name": "张三-修改", "age": 35, "isChina": True, "hobby": ["唱歌", "跳舞"]}
}
# print(es_client.update(index='test', doc_type='user_info', id="1",body=body))
# Output: {'_index': 'test', '_type': 'user_info', '_id': '1', '_version': 3, 'result': 'updated', '_shards': {'total': 2, 'successful': 1, 'failed': 0}}

# 批量更新带条件(注意:这里使用的是script更新,老版本的需要将inline换成source)
uBody = {
        # 查询条件
        'query': {
            'term': {
                "age": 35
            }
        },
        # 更新内容,第一种更新方式
        'script': {
            "inline": "ctx._source.name = params.name",
            "params": {
                "name": "张三-script第一种修改"
            },
        }
}
print(es_client.update_by_query(index='test', doc_type='user_info', body=uBody))
# Output: {'took': 0, 'timed_out': False, 'total': 0, 'updated': 1, 'deleted': 0, 'batches': 0, 'version_conflicts': 0, 'noops': 0, 'retries': {'bulk': 0, 'search': 0}, 'throttled_millis': 0, 'requests_per_second': -1.0, 'throttled_until_millis': 0, 'failures': []}

# bulk批量更新
actions = [
    {
        "_op_type": "update",  # 操作命令,这里为更新
        "index": "test",  # 数据的索引
        "_id": "1",  # 要更新的数据 _id
        "doc": {
            "name": "bulk更新"
        }
    }
]
print(helpers.bulk(client=es_client, actions=actions, index="test", doc_type="user_info", raise_on_error=True))
# Output: (1, [])
# 返回全部数据
print(es_client.search(index='test', doc_type='user_info'))
# Output: {'took': 0, 'timed_out': False, '_shards': {'total': 5, 'successful': 5, 'failed': 0}, 'hits': {'total': 1, 'max_score': 1.0, 'hits': [{'_index': 'test', '_type': 'user_info', '_id': '1', '_score': 1.0, '_source': {'name': '李四', 'age': 35, 'isChina': True, 'hobby': ['唱歌', '跳舞']}}]}}

# 条件查询(注意:这里变化的就是DSL语句)
queryBody = {
    'query': {
        'match_all': {}
    }
}
print(es_client.search(index='test', doc_type='user_info', body=queryBody))
# Output: {'took': 0, 'timed_out': False, '_shards': {'total': 5, 'successful': 5, 'failed': 0}, 'hits': {'total': 1, 'max_score': 1.0, 'hits': [{'_index': 'test', '_type': 'user_info', '_id': '1', '_score': 1.0, '_source': {'name': '李四', 'age': 35, 'isChina': True, 'hobby': ['唱歌', '跳舞']}}]}}
  • 收藏
  • 评论
  • 分享
  • 举报

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK