社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Elasticsearch

ElasticSearch的搭建与数据统计

唤之 • 7 年前 • 721 次点击  

 平台内的产品有一个数据分析,统计平台内某个商户某个时间段内(今天、昨天、7天内、30天内……)的各种数据分析,这种分析显然用MySql的count、sum、GroupBy之类的去查询是很不靠谱的,尤其是在数据量很大的情况下效率就不言而喻了,本来想着用HBase的MR来做,或者直接把各纬度的数据通过HADOOP的MR处理完存到HBase里面,后来与朋友聊天后被朋友严重鄙视了一顿,鄙视的内容基本是嫌弃我们的数据量太小根本用不到HBase更用不到MR,在朋友的极力蛊惑之下决定用ElasticSearch来实现以下简称ES,好吧,那我们还是从传统的搭建-采坑-填坑-再采坑-再填坑开始。

1、下载并安装配置ElasticSearch

 ElasticSearch的官网www.elastic.co/products/el…找到需要的版本,我这里选择的是5.6.9的版本,不要问我为什么,因为最新版在我的未知领域有更多的坑!直接下载5.6.9是我目前用着最舒服的一个版本,ES依赖最低JDK1.8版本,所以环境变量一定要配置好

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.9.tar.gz
tar -zxvf elasticsearch-5.6.9.tar.gz -C /usr/local/
cd /usr/local/elasticsearch-5.6.9

 ES的配置文件全部在config目录下,其中elasticsearch.yml是主配置文件,进去后主要修改几个地方,其他的地方根据业务需求自行修改:

vim config/其中elasticsearch.yml
cluster.name=tsk-es
node.name=tsk1
path.data: /opt/data/elastic/data
path.logs: /opt/data/elastic/log
network.host: 0.0.0.0
http.port: 9200

 不要以为把elasticsearch.yml修改完就可以直接执行bin目录下的elasticsearch,会报一堆错误给你的!第一个错误就是告诉你不能用root用户启动ES,所以你要先创建一个用户,我这里创建一个叫elastic的用户然后记得给用户授文件夹的权限,然后su进入用户启动,但是先别急着进elastic用户,还有些东西需要改一下:

 1、修改/etc/security/limits.conf文件,否则会报max file descriptors [4096] for elasticsearch process likely too low, increase to at least [65536]错误

vim /etc/security/limits.conf
* soft nofile 65536
* hard nofile 65536
* soft nproc 2048
* hard nproc 4096

 2、修改/etc/sysctl.conf 文件否则会报max virtual memory areas vm.max_map_count [65530] likely too low, increase to at least [262144]错误

vim /etc/sysctl.conf
vm.max_map_count=262144

 全部修改完成之后就可以进入elastic用户启动ES了

su elastic
bin/elasticsrarch

 如果不出意外的话你的ES应该正常运行了,这时用浏览器访问192.168.0.1:9200你就会看到一串JSON字符串,证明你的ES已经启动成功,如果想要后台运行ES直接执行

nohup bin/elasticsearch > /opt/data/elastic/elastic.log 2>&1 &

2、ES操作初了解

 ES的操作都是通过HTTP请求进行的,不同的请求方式和参数针对不同的操作,比如创建一个索引就是PUT,删除一个索引用的是DELETE,查询如果没参数直接用GET就好,如果有参数或者是提交数据的话用POST,那么我们第一步肯定是先创建索引开始:

PUT:http://192.168.0.1:9200/shopsinfo
{
"mappings":{
    "shopsOrder":{
        "properties":{
            "shopid":{
                "type":"string",
                "index": "not_analyzed"
            },
            "createdate":{
                "type":"string",
                "index": "not_analyzed"
            },
            "timestamp":{
                "type":"long"    
            },
            "paymentType":{
                "type":"string"    ,
                "index": "not_analyzed"
            },
            "amount":{
                "type":"long"
            }
        }
    }
}
}

 上面的意思是创建一个名叫shopsinfo的索引,里面有一个叫shopsOrder的mapping,里面有shopid,createdata,timestamp,paymentType,amount几个字段,以及分别对应的type

 插入数据比较简单,就是POST就好参数就是一个JSON

POST:http://192.168.0.1:9200/shopsinfo/shopsOrder
{
    "shopid": "96119",
    "createdate": "20180410",
    "timestamp": 1523289600000,
    "paymentType": "alipay",
    "amount": 6917
}

 删除数据

POST:http://192.168.0.1:9200/shopsinfo/shopsOrder/_delete_by_query

 查询
GET/POST:http://192.168.0.1:9200/shopsinfo/shopsOrder/_search

 关于ES的操作和其他不再过多赘述,官网有中文版,度娘上也一大堆,重中之重是ES的统计查询这是ES的关键

3、重点之查询

 ES是属于倒排索引,查询的效率特别的高,但是ES的查询语句就很麻烦了,ES不管是查询、统计都是用POST的BODY以JSON的形式进行的,比如我要查询时间戳>某个时间并且shopId为100000002和100000006的在SQL中是这样的:

select * from shopsOrder where timestamp>1523671189000 and shopid in ("100000002","100000006")

 在ES中就得这么查:

POST:http://192.168.0.1:9200/shopsinfo/shopsOrder/_search
{
    "size":20,
    "query":{
        "bool":{
            "must":[
                {
                    "range":{
                        "timestamp":{
                            "gte":1523671189000
                        }

                    }
                },
                {
                    "terms":{
                        "shopid":["100000002","100000006"]
                    }
                }
            ]
        }
    }
}

 这里面我传递了size参数,如果不传,ES默认给你返回10条数据,查询结果ES也会给你返回JSON,其中hits字段中会有total就是你查询的结果的总数hits会返回给你结果内容。

 以上是简单的查询,统计的话ES是以aggs作为参数,全称应该叫做Aggregation,比如接着刚才的查询我想计算出结果的amount总额是多少就是类似SQL中的

select sum(amount)query_amount from shopsOrder where timestamp>1523671189000 and shopid in ("100000002","100000006")

 在ES中就得这么查

{
    "aggs":{
        "query_amount":{
            "sum":{
                "field":"amount"
            }
        }
    },
    "query":{
        "bool":{
            "must":[
                {
                    "range":{
                        "timestamp":{
                            "gte":1523671189000
                        }

                    }
                },
                {
                    "terms":{
                        "shopid":["100000002","100000006"]
                    }
                }
            ]
        }
    }
}

 统计的结果在返回值的aggregations参数里的query_amount下类似这样的:

......
"aggregations": {
    "query_amount": {
        "value": 684854
    }
}
......

 接下来再复杂一点点,按天分组进行统计查询SQL中的提现是这样的:

select createdate,sum(amount)query_amount from shopsOrder where timestamp>1523671189000 and shopid in ("100000002","100000006")
group by createdate order by createdate

 在ES中是这样的:

{
    "size":0,
    "aggs":{
        "orderDate":{
            "terms":{
                "field":"createdate",
                "order":{
                    "_term":"asc"
                }
            },
            "aggs":{
                "query_amount":{
                    "sum":{
                        "field":"amount"
                    }
                }
            }
        }
    },
    "query":{
        "bool":{
            "must":[
                {
                    "range":{
                        "timestamp":{
                            "gte":1523671189000
                        }

                    }
                },
                {
                    "terms":{
                        "shopid":["100000002","100000006"]
                    }
                }
            ]
        }
    }
}

 查询结果为

......
"aggregations": {
    "orderDate": {
        "doc_count_error_upper_bound": 0,
        "sum_other_doc_count": 99,
        "buckets": [
            ......
            {
                "key": "20180415",
                "doc_count": 8,
                "query_amount": {
                    "value": 31632
                }
            },
            {
                "key": "20180417",
                "doc_count": 3,
                "query_amount": {
                    "value": 21401
                }
            },
            {
                "key": "20180418",
                "doc_count": 2,
                "query_amount": {
                    "value": 2333
                }
            }
            ......
        ]
    }
}

 buckets中就是查询的结果,key为按我createdate分组后的值,doc_count类似count,query_amount为sum后的值。至于我的参数里面有一个size:0是因为我不需要具体的记录就是hits,所以这里传0

 最后我们来个更复杂的1、统计所有的总额;2、先按paymentType支付方式分组统计amount总额,并且每个支付方式中再按天分组统计每天的amount总额




    
    {
    "size":0,
    "aggs":{
        "amount":{
            "sum":{
                "field":"amount"
            }
        },
        "paymenttype":{
            "terms":{
                "field":"paymentType"
            },
            "aggs":{
                "query_amount":{
                    "sum":{
                        "field":"amount"
                    }
                },
                "payment_date":{
                    "terms":{
                        "field":"createdate"
                    },
                    "aggs":{
                        "query_amount":{
                            "sum":{
                                "field":"amount"
                            }
                        }
                    }
                }
            }
        }
    },
    "query":{
        "bool":{
            "must":[
                {
                    "range":{
                        "timestamp":{
                            "gte":1523671189000
                        }

                    }
                },
                {
                    "terms":{
                        "shopid":["100000002","100000006"]
                    }
                }
            ]
        }
    }
}

 查询结果为:

......
"amount": {
    "value": 684854
},
"paymenttype":{
     ......
    "buckets": [
        {
            "key": "wechatpay",
            "doc_count": 73,
            "amount": {
                "value": 351142
            },
            "payment_date": {
                "doc_count_error_upper_bound": 0,
                "sum_other_doc_count": 25,
                "buckets": [
                    ......
                    {
                        "key": "20180415",
                        "doc_count": 6,
                        "amount": {
                            "value": 29032
                        }
                    },
                    {
                        "key": "20180425",
                        "doc_count": 6,
                        "amount": {
                            "value": 21592
                        }
                    }
                    ......
                ]
            }
        },
        {
            "key": "alipay",
            "doc_count": 67,
            "amount": {
                "value": 333712
            },
            "payment_date": {
                "doc_count_error_upper_bound": 0,
                "sum_other_doc_count": 23,
                "buckets": [
                    ......
                    {
                        "key": "20180506",
                        "doc_count": 8,
                        "amount": {
                            "value": 38280
                        }
                    },
                    {
                        "key": "20180426",
                        "doc_count": 6,
                        "amount": {
                            "value": 41052
                        }
                    }
                    ......
                ]
            }
        }
    ]
}

4、JAVA操作ES

 根据自己下载的ES版本下载对应版本的JAR包,我安装的是5.6.9所以我的JAR包版本也应该是5.6.9

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>5.6.9</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>5.6.9</version>
</dependency>

 创建一个helper操作ES,由于我的ES项目是基于SpringBoot的所以我的helper决定交由spring去管理,其实写一个单例也是可以的,先创建client连接

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import java.net.InetAddress;

Settings settings = Settings.builder().put("cluster.name", "tsk-es").put("client.transport.sniff", true)
                    .build();
TransportClient client = new PreBuiltTransportClient(settings)
                    .addTransportAddresses(new InetSocketTransportAddress(InetAddress.getByName(HOST), PORT));

 插入数据比较简单你可以直接插入JSON字符串,也可以传递JAVA BEAN

import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.common.xcontent.XContentType;

IndexResponse response = client.prepareIndex(index, mapping).setSource(jsonStr, XContentType.JSON)
                .get();

 查询就比较麻烦了,已上面最后一个查询先按paymentType支付方式分组统计amount总额,并且每个支付方式中再按天分组统计每天的amount总额为例:

public void getAmountData(Long startTimestamp, String... shopIds) {
    // 先初始化一个SearchRequestBuilder,指向shopsInfo/shopsOrder
    SearchRequestBuilder sbuilder = client.prepareSearch("shopsinfo").setTypes("shopsOrder");
    // 条件一
    TermsQueryBuilder mpq = QueryBuilders.termsQuery("shopid", shopIds);
    // 条件二
    RangeQueryBuilder mpq2 = QueryBuilders.rangeQuery("timestamp").gte(startTimestamp);
    // 初始化QueryBuilder
    QueryBuilder queryBuilder = QueryBuilders.boolQuery().must(mpq).must(mpq2);
    // 将QueryBuilder放入SearchRequestBuilder
    sbuilder.setQuery(queryBuilder);
    sbuilder.setSize(0);

    // sum,这里创建一个实例全部用这个实例就行
    SumAggregationBuilder salaryAgg = AggregationBuilders.sum("query_amount").field("amount");

    TermsAggregationBuilder paymentAgg = AggregationBuilders.terms("paymentType").field("paymentType");
    paymentAgg.size(100);
    paymentAgg.subAggregation(salaryAgg);
    TermsAggregationBuilder groupDateAff = AggregationBuilders.terms("payment_date").field("createdate")
            .order(Order.term(true));
    groupDateAff.size(100);
    groupDateAff.subAggregation(salaryAgg);
    paymentAgg.subAggregation(groupDateAff);
    // 将统计查询放入SearchRequestBuilder
    sbuilder.addAggregation(salaryAgg).addAggregation(paymentAgg);
    SearchResponse response = sbuilder.execute().actionGet();
    Map<String, Aggregation> aggMap = response.getAggregations().asMap();
    // 获取全部的总额
    InternalSum shopGroupAllAmount = (InternalSum) aggMap.get("amount");
    Double amount = shopGroupAllAmount.getValue();
    ......
}

 SearchResponse中就已经可以获取到全部的信息,至于后续怎么解析数据那就看具体业务需求以及每个人的习惯。ES的各种操作说简单也简单说复杂也复杂,按照朋友的话说就是用熟了自然就简单,确实也是这样,不管用啥都得深入了解一下,要不然自己就是个坑!比如我在做统计查询的时候返回的结果总是比应该的结果要少很多,而少的数量总是出现在sum_other_doc_count这个字段里,研究了半天才发现原来统计的结果也需要传递size参数,否则一样默认10条!

 最后,还是要感谢一下我那朋友的,虽然他对我想构建各种大数据平台的事情嗤之以鼻(因为我们数据量确实不大),但依然输出了一下他所能想到的最优解。

坚持原创技术分享,您的支持将鼓励我继续创作! 天狼武士 WeChat Pay

微信打赏

天狼武士 Alipay

支付宝打赏


今天看啥 - 高品质阅读平台
本文地址:http://www.jintiankansha.me/t/4FKmFOmf0T
Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/12019
 
721 次点击