Py学习  »  Elasticsearch

Elasticsearch聚合到pandas数据帧

Martin • 5 年前 • 1862 次点击  

我正在处理一些ElasticSearch数据,我想从Kibana这样的聚合中生成表。聚合的示例输出如下,基于以下代码:

    s.aggs.bucket("name1", "terms", field="field1").bucket(
        "name2", "terms", field="innerField1"
    ).bucket("name3", "terms", field="InnerAgg1")
     response = s.execute()
   resp_dict = response.aggregations.name.buckets




{
    "key": "Locationx",
    "doc_count": 12,
    "name2": {
        "doc_count_error_upper_bound": 0,
        "sum_other_doc_count": 0,
        "buckets": [{
            "key": "Sub-Loc1",
            "doc_count": 1,
            "name3": {
                "doc_count_error_upper_bound": 0,
                "sum_other_doc_count": 0,
                "buckets": [{
                    "key": "super-Loc1",
                    "doc_count": 1
                }]
            }
        }, {
            "key": "Sub-Loc2",
            "doc_count": 1,
            "name3": {
                "doc_count_error_upper_bound": 0,
                "sum_other_doc_count": 0,
                "buckets": [{
                    "key": "super-Loc1",
                    "doc_count": 1
                }]
            }
        }]
    }
}

在这种情况下,预期产出将是:

Expected Output

现在,我尝试了多种方法,并简要描述了出现的问题:

Pandasticsearch=即使只有一本字典也完全失败。这本字典并没有被创造出来,因为它在与钥匙斗争,即使每本字典都被分开处理:

for d in resp_dict :
    x= d.to_dict()
    pandas_df = Select.from_dict(x).to_pandas()
    print(pandas_df)

特别是,收到的错误与字典没有编出来,因此['take']不是一个键有关。

Pandas(pd.Dataframe.from_records())=只给了我第一个聚合,其中有一列包含内部字典,在它上面使用pd.apply(pd.Series)给出了另一个结果字典表。

堆垛溢水柱 recursive function 这本词典看起来与所用的例子完全不同,除非我彻底改变输入,否则修改不会给我带来任何好处。

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/57249
 
1862 次点击  
文章 [ 1 ]  |  最新文章 5 年前
TheDonk
Reply   •   1 楼
TheDonk    5 年前

在同一个问题上挣扎,我开始相信原因是反应不是正常的指令,而是 elasticsearch_dsl.utils.AttrList 属于 elasticsearch_dsl.utils.AttrDict .

如果你有一个 AttrList AttrDicts ,可以执行以下操作:

resp_dict = response.aggregations.name.buckets
new_response = [i._d_ for i in resp_dict]

以获取一个普通听写的列表。这可能会更好的发挥与其他图书馆。

编辑:

我写了一个递归函数,它至少处理了一些情况,但还没有经过广泛的测试,也没有包装在一个好的模块或任何东西中。只是个剧本。这个 one_lvl 函数在名为 tmp

这个 lvl 东西是必要的,因为你可能有重复的名字,所以 key

#!/usr/bin/env python3

from elasticsearch_dsl.query import QueryString
from elasticsearch_dsl import Search, A
from elasticsearch import Elasticsearch
import pandas as pd

PORT = 9250
TIMEOUT = 10000
USR = "someusr"
PW = "somepw"
HOST = "test.com"
INDEX = "my_index"
QUERY = "foobar"

client = Elasticsearch([HOST], port = PORT, http_auth=(USR, PW), timeout = TIMEOUT)

qs = QueryString(query = QUERY)
s = Search(using=client, index=INDEX).query(qs)

s = s.params(size = 0)

agg= {
    "dates" : A("date_histogram", field="date", interval="1M", time_zone="Europe/Berlin"),
    "region" : A("terms", field="region", size=10),
    "county" : A("terms", field="county", size = 10)
}

s.aggs.bucket("dates", agg["dates"]). \
       bucket("region", agg["region"]). \
       bucket("county", agg["county"])

resp = s.execute()

data = {"buckets" : [i._d_ for i in resp.aggregations.dates]}
rec_list = ["buckets"] + [*agg.keys()]

def get_fields(i, lvl):
    return {(k + f"{lvl}"):v for k, v in i.items() if k not in rec_list}

def one_lvl(data, tmp, lvl, rows, maxlvl):
    tmp = {**tmp, **get_fields(data, lvl)}

    if "buckets" not in data:
        rows.append(tmp)

    for d in data:
        if d in ["buckets"]:
            for v, b in enumerate(data[d]):
                tmp = {**tmp, **get_fields(data[d][v], lvl)}
                for k in b:
                    if k in agg.keys():
                        one_lvl(data[d][v][k], tmp, lvl+1, rows, maxlvl)
                    else:
                        if lvl == maxlvl:
                            tmp = {**tmp, (k + f"{lvl}") : data[d][v][k]}
                            rows.append(tmp)

    return rows


rows = one_lvl(data, {}, 1, [], len(agg))
df = pd.DataFrame(rows)