社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Elasticsearch

Elasticsearch聚合到pandas数据帧

Martin • 4 年前 • 1143 次点击  

我正在处理一些ElasticSearch数据,我想从Kibana这样的聚合中生成表。聚合的示例输出如下,基于以下代码:

    s.aggs.bucket("name1", "terms", field="field1").bucket(
        "name2", "terms", field="innerField1"
    ).bucket("name3", "terms", field="InnerAgg1")
     response = s.execute()
   resp_dict = response.aggregations.name.buckets




{
    "key": "Locationx",
    "doc_count": 12,
    "name2": {
        "doc_count_error_upper_bound": 0,
        "sum_other_doc_count": 0,
        "buckets": [{
            "key": "Sub-Loc1",
            "doc_count": 1,
            "name3": {
                "doc_count_error_upper_bound": 0,
                "sum_other_doc_count": 0,
                "buckets": [{
                    "key": "super-Loc1",
                    "doc_count": 1
                }]
            }
        }, {
            "key": "Sub-Loc2",
            "doc_count": 1,
            "name3": {
                "doc_count_error_upper_bound": 0,
                "sum_other_doc_count": 0,
                "buckets": [{
                    "key": "super-Loc1",
                    "doc_count": 1
                }]
            }
        }]
    }
}

在这种情况下,预期产出将是:

Expected Output

现在,我尝试了多种方法,并简要描述了出现的问题:

Pandasticsearch=即使只有一本字典也完全失败。这本字典并没有被创造出来,因为它在与钥匙斗争,即使每本字典都被分开处理:

for d in resp_dict :
    x= d.to_dict()
    pandas_df = Select.from_dict(x).to_pandas()
    print(pandas_df)

特别是,收到的错误与字典没有编出来,因此['take']不是一个键有关。

Pandas(pd.Dataframe.from_records())=只给了我第一个聚合,其中有一列包含内部字典,在它上面使用pd.apply(pd.Series)给出了另一个结果字典表。

堆垛溢水柱 recursive function 这本词典看起来与所用的例子完全不同,除非我彻底改变输入,否则修改不会给我带来任何好处。

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/57249
 
1143 次点击  
文章 [ 1 ]  |  最新文章 4 年前
TheDonk
Reply   •   1 楼
TheDonk    4 年前

在同一个问题上挣扎,我开始相信原因是反应不是正常的指令,而是 elasticsearch_dsl.utils.AttrList 属于 elasticsearch_dsl.utils.AttrDict .

如果你有一个 AttrList AttrDicts ,可以执行以下操作:

resp_dict = response.aggregations.name.buckets
new_response = [i._d_ for i in resp_dict]

以获取一个普通听写的列表。这可能会更好的发挥与其他图书馆。

编辑:

我写了一个递归函数,它至少处理了一些情况,但还没有经过广泛的测试,也没有包装在一个好的模块或任何东西中。只是个剧本。这个 one_lvl 函数在名为 tmp

这个 lvl 东西是必要的,因为你可能有重复的名字,所以 key

#!/usr/bin/env python3

from elasticsearch_dsl.query import QueryString
from elasticsearch_dsl import Search, A
from elasticsearch import Elasticsearch
import pandas as pd

PORT = 9250
TIMEOUT = 10000
USR = "someusr"
PW = "somepw"
HOST = "test.com"
INDEX = "my_index"
QUERY = "foobar"

client = Elasticsearch([HOST], port = PORT, http_auth=(USR, PW), timeout = TIMEOUT)

qs = QueryString(query = QUERY)
s = Search(using=client, index=INDEX).query(qs)

s = s.params(size = 0)

agg= {
    "dates" : A("date_histogram", field="date", interval="1M", time_zone="Europe/Berlin"),
    "region" : A("terms", field="region", size=10),
    "county" : A("terms", field="county", size = 10)
}

s.aggs.bucket("dates", agg["dates"]). \
       bucket("region", agg["region"]). \
       bucket("county", agg["county"])

resp = s.execute()

data = {"buckets" : [i._d_ for i in resp.aggregations.dates]}
rec_list = ["buckets"] + [*agg.keys()]

def get_fields(i, lvl):
    return {(k + f"{lvl}"):v for k, v in i.items() if k not in rec_list}

def one_lvl(data, tmp, lvl, rows, maxlvl):
    tmp = {**tmp, **get_fields(data, lvl)}

    if "buckets" not in data:
        rows.append(tmp)

    for d in data:
        if d in ["buckets"]:
            for v, b in enumerate(data[d]):
                tmp = {**tmp, **get_fields(data[d][v], lvl)}
                for k in b:
                    if k in agg.keys():
                        one_lvl(data[d][v][k], tmp, lvl+1, rows, maxlvl)
                    else:
                        if lvl == maxlvl:
                            tmp = {**tmp, (k + f"{lvl}") : data[d][v][k]}
                            rows.append(tmp)

    return rows


rows = one_lvl(data, {}, 1, [], len(agg))
df = pd.DataFrame(rows)