私信  •  关注

TheDonk

TheDonk 最近创建的主题
TheDonk 最近回复了
5 年前
回复了 TheDonk 创建的主题 » Elasticsearch聚合到pandas数据帧

在同一个问题上挣扎,我开始相信原因是反应不是正常的指令,而是 elasticsearch_dsl.utils.AttrList 属于 elasticsearch_dsl.utils.AttrDict .

如果你有一个 AttrList AttrDicts ,可以执行以下操作:

resp_dict = response.aggregations.name.buckets
new_response = [i._d_ for i in resp_dict]

以获取一个普通听写的列表。这可能会更好的发挥与其他图书馆。

编辑:

我写了一个递归函数,它至少处理了一些情况,但还没有经过广泛的测试,也没有包装在一个好的模块或任何东西中。只是个剧本。这个 one_lvl 函数在名为 tmp

这个 lvl 东西是必要的,因为你可能有重复的名字,所以 key

#!/usr/bin/env python3

from elasticsearch_dsl.query import QueryString
from elasticsearch_dsl import Search, A
from elasticsearch import Elasticsearch
import pandas as pd

PORT = 9250
TIMEOUT = 10000
USR = "someusr"
PW = "somepw"
HOST = "test.com"
INDEX = "my_index"
QUERY = "foobar"

client = Elasticsearch([HOST], port = PORT, http_auth=(USR, PW), timeout = TIMEOUT)

qs = QueryString(query = QUERY)
s = Search(using=client, index=INDEX).query(qs)

s = s.params(size = 0)

agg= {
    "dates" : A("date_histogram", field="date", interval="1M", time_zone="Europe/Berlin"),
    "region" : A("terms", field="region", size=10),
    "county" : A("terms", field="county", size = 10)
}

s.aggs.bucket("dates", agg["dates"]). \
       bucket("region", agg["region"]). \
       bucket("county", agg["county"])

resp = s.execute()

data = {"buckets" : [i._d_ for i in resp.aggregations.dates]}
rec_list = ["buckets"] + [*agg.keys()]

def get_fields(i, lvl):
    return {(k + f"{lvl}"):v for k, v in i.items() if k not in rec_list}

def one_lvl(data, tmp, lvl, rows, maxlvl):
    tmp = {**tmp, **get_fields(data, lvl)}

    if "buckets" not in data:
        rows.append(tmp)

    for d in data:
        if d in ["buckets"]:
            for v, b in enumerate(data[d]):
                tmp = {**tmp, **get_fields(data[d][v], lvl)}
                for k in b:
                    if k in agg.keys():
                        one_lvl(data[d][v][k], tmp, lvl+1, rows, maxlvl)
                    else:
                        if lvl == maxlvl:
                            tmp = {**tmp, (k + f"{lvl}") : data[d][v][k]}
                            rows.append(tmp)

    return rows


rows = one_lvl(data, {}, 1, [], len(agg))
df = pd.DataFrame(rows)