Py学习  »  Elasticsearch

ElasticSearch Java API使用

DeepRedApple • 5 年前 • 358 次点击  
阅读 12

ElasticSearch Java API使用

感谢全科的ElasticSearch讲解,大部分来源于此

ElasticSearch

MySQL与ElasticSearch的比较

MySQL ElasticSearch
Database(数据库) Index(索引)
Table(表) Type(类型)
Row(行) Document(文档)
Column(列) Field(字段)
Schema(方案) Mapping(映射)
Index(索引) Everthing Indexed by default(所有字段都被索引)
SQL(结构化查询语言) Query DSL(查询专用语言)

Document APIs

Index API

Index API 允许我们存储一个JSON格式的文档,使得数据可以被搜索到。文档通过index、type、id唯一确定。id可以自己提供一个ID,也可以使用Index API为我们生成一个。

有四种不同的方式来产生JSON格式的文档(document)

  • 手动方式,使用原生的byte[]或者String
  • 使用Map方式,会自动转换成与之等价的JSON
  • 使用第三方库来生成序列化beans,如JackJSON、FastJSON等
  • 使用内置的帮助类XContentFactory.jsonBuilder()

手动方式

    /**
     * 手动方式
     * @throws UnknownHostException
     */
    @Test
    public void JsonDocument() throws UnknownHostException {
        String json = "{" +
                "\"user\":\"deepredapple\"," +
                "\"postDate\":\"2018-01-30\"," +
                "\"message\":\"trying out Elasticsearch\"" +
                "}";
        IndexResponse indexResponse = client.prepareIndex("fendo", "fendodate").setSource(json).get();
        System.out.println(indexResponse.getResult());
    }

Map方式

    /**
     * Map方式
     */
    @Test
    public void MapDocument() {
        Map<String, Object> json = new HashMap<String, Object>();
        json.put("user", "hhh");
        json.put("postDate", "2018-06-28");
        json.put("message", "trying out Elasticsearch");
        IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(json).get();
        System.out.println(response.getResult());
    }

序列化方式

    /**
     * 使用JACKSON序列化
     */
    @Test
    public void JACKSONDocument() throws JsonProcessingException {
        Blog blog = new Blog();
        blog.setUser("123");
        blog.setPostDate("2018-06-29");
        blog.setMessage("try out ElasticSearch");

        ObjectMapper mapper = new ObjectMapper();
        byte[] bytes = mapper.writeValueAsBytes(blog);
        IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(bytes).get();
        System.out.println(response.getResult());
    }

XContentBuilder帮助类方式

    /**
     * 使用XContentBuilder帮助类方式
     */
    @Test
    public void XContentBuilderDocument() throws IOException {
        XContentBuilder builder = XContentFactory.jsonBuilder().startObject()
                .field("user", "xcontentdocument"


    
)
                .field("postDate", "2018-06-30")
                .field("message", "this is ElasticSearch").endObject();
        IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(builder).get();
        System.out.println(response.getResult());
    }

综合示例

package com.deepredapple.es.document;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

/**
 * @author DeepRedApple
 */
public class TestClient {

    TransportClient client = null;

    public static final String INDEX = "fendo";

    public static final String TYPE = "fendodate";

    @Before
    public void beforeClient() throws UnknownHostException {
        client = new PreBuiltTransportClient(Settings.EMPTY)
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
    }

    /**
     * 手动方式
     * @throws UnknownHostException
     */
    @Test
    public void JsonDocument() throws UnknownHostException {
        String json = "{" +
                "\"user\":\"deepredapple\"," +
                "\"postDate\":\"2018-01-30\"," +
                "\"message\":\"trying out Elasticsearch\"" +
                "}";
        IndexResponse indexResponse = client.prepareIndex(INDEX, TYPE).setSource(json).get();
        System.out.println(indexResponse.getResult());
    }

    /**
     * Map方式
     */
    @Test
    public void MapDocument() {
        Map<String, Object> json = new HashMap<String, Object>();
        json.put("user", "hhh");
        json.put("postDate", "2018-06-28");
        json.put("message", "trying out Elasticsearch");
        IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(json).get();
        System.out.println(response.getResult());
    }

    /**
     * 使用JACKSON序列化
     */
    @Test
    public void JACKSONDocument() throws JsonProcessingException {
        Blog blog = new Blog();
        blog.setUser("123");
        blog.setPostDate("2018-06-29");
        blog.setMessage("try out ElasticSearch");

        ObjectMapper mapper = new ObjectMapper();
        byte[] bytes = mapper.writeValueAsBytes(blog);
        IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(bytes).get();
        System.out.println(response.getResult());
    }

    /**
     * 使用XContentBuilder帮助类方式
     */
    @Test
    public void XContentBuilderDocument() throws IOException {
        XContentBuilder builder = XContentFactory.jsonBuilder().startObject()
                .field("user", "xcontentdocument")
                .field("postDate", "2018-06-30")
                .field("message", "this is ElasticSearch").endObject();
        IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(builder).get();
        System.out.println(response.getResult());
    }

}

Get API

get API 可以通过id查看文档

GetResponse getResponse = client.prepareGet(INDEX, TYPE, "AWRJCXMhro3r8sDxIpir").get();

参数分别为索引、类型、_id

配置线程

setOperationThreaded设置为true是在不同的线程里执行此操作

    /**
     * Get API
     */
    @Test
    public void testGetApi() {
        GetResponse getResponse = client.prepareGet(INDEX, TYPE, "AWRJCXMhro3r8sDxIpir").setOperationThreaded(false).get();
        Map<String, Object> map = getResponse.getSource();
        Set<String> keySet = map.keySet();
        for (String str : keySet) {
            Object o = map.get(str);
            System.out.println(o.toString());
        }
    }

Delete API

根据ID删除:

DeleteResponse deleteResponse = client.prepareDelete(INDEX, TYPE, "AWRJCXMhro3r8sDxIpir").get();

参数为索引、类型、_id

配置线程

setOperationThreaded设置为true是在不同的线程里执行此操作

    /**
     * deleteAPI
     */
    @Test
    public void testDeleteAPI() {
        GetResponse getResponse = client.prepareGet(INDEX, TYPE, "AWRJCXMhro3r8sDxIpir").setOperationThreaded(false).get();
        System.out.println(getResponse.getSource());
        DeleteResponse deleteResponse = client.prepareDelete(INDEX, TYPE, "AWRJCXMhro3r8sDxIpir").get();
        System.out.println(deleteResponse.getResult());
    }

Delete By Query API

通过查询条件删除

    /**
     * 通过查询条件删除
     */
    @Test
    public void deleteByQuery() 


    
{
        BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
                .filter(QueryBuilders.matchQuery("user", "hhh")) //查询条件
                .source(INDEX).get();//索引名
        long deleted = response.getDeleted();//删除文档数量
        System.out.println(deleted);
    }

参数说明 QueryBuilders.matchQuery("user", "hhh") 的参数为字段和查询条件,source(INDEX)参数为索引名

异步回调

当执行的删除的时间过长时,可以使用异步回调的方式执行删除操作,执行的结果在回调里面获取

    /**
     * 回调的方式执行删除 适合大数据量的删除操作
     */
    @Test
    public void DeleteByQueryAsync() {
        for (int i = 1300; i < 3000; i++) {
            DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
                .filter(QueryBuilders.matchQuery("user", "hhh " + i))
                .source(INDEX)
                .execute(new ActionListener<BulkByScrollResponse>() {
                    public void onResponse(BulkByScrollResponse response) {
                        long deleted = response.getDeleted();
                        System.out.println("删除的文档数量为= "+deleted);
                    }

                    public void onFailure(Exception e) {
                        System.out.println("Failure");
                    }
                });
        }
    }

当程序停止时,在ElasticSearch的控制台依旧在执行删除操作,异步的执行操作

监听回调方法是execute方法

    .execute(new ActionListener<BulkByScrollResponse>() { //回调方法
      public void onResponse(BulkByScrollResponse response) {
        long deleted = response.getDeleted();
        System.out.println("删除的文档数量为= "+deleted);
      }

      public void onFailure(Exception e) {
        System.out.println("Failure");
      }
    });

Update API

更新索引

主要有两种方法进行更新操作

  • 创建UpdateRequest,通过client发送
  • 使用prepareUpdate()方法。

使用UpdateRequest

    /**
     * 使用UpdateRequest进行更新
     */
    @Test
    public void testUpdateAPI() throws IOException, ExecutionException, InterruptedException {
        UpdateRequest updateRequest = new UpdateRequest();
        updateRequest.index(INDEX);
        updateRequest.type(TYPE);
        updateRequest.id("AWRFv-yAro3r8sDxIpib");
        updateRequest.doc(jsonBuilder()
                .startObject()
                    .field("user", "hhh")
                .endObject());
        client.update(updateRequest).get();
    }

使用prepareUpdate()

    /**
     * 使用PrepareUpdate
     */
    @Test
    public void testUpdatePrepareUpdate() throws IOException {
        client.prepareUpdate(INDEX, TYPE, "AWRFvA7k0udstXU4tl60")
                .setScript(new Script("ctx._source.user = \"DeepRedApple\"")).get();
        client.prepareUpdate(INDEX, TYPE, "AWRFvA7k0udstXU4tl60")
                .setDoc(jsonBuilder()
                .startObject()
                    .field("user", "DeepRedApple")
                .endObject()).get();
    }

client.prepareUpdate中的setScript方法不同的版本的参数不同,这里直接传入值,也可以直接插入文件存储的脚本,然后直接执行脚本里面的数据进行更新操作。

Update By Script

使用脚本更新文档

    /**
     * 通过脚本更新
     */
    @Test
    public void testUpdateByScript() throws ExecutionException, InterruptedException {
        UpdateRequest updateRequest = new UpdateRequest(INDEX, TYPE, "AWRFvLSTro3r8sDxIpia")
                .script(new Script("ctx._source.user = \"LZH\""));
        client.update(updateRequest).get();
    }

Upsert

更新文档,如果存在文档就更新,如果不存在就插入

    /**
     * 更新文档 如果存在更新,否则插入
     */
    @Test
    public void testUpsert() throws IOException, ExecutionException, InterruptedException {
        IndexRequest indexRequest = new IndexRequest(INDEX, TYPE, "AWRFvLSTro3r8sDxIp12")
                .source(jsonBuilder()
                    .startObject()
                        .field("user", "hhh")
                        .field("postDate", "2018-02-14")
                        .field("message", "ElasticSearch")
                    .endObject());
        UpdateRequest updateRequest = new UpdateRequest(INDEX, TYPE, "AWRFvLSTro3r8sDxIp12")
                .doc(jsonBuilder()
                    .startObject()
                        .field("user", "LZH")
                    .endObject())
                .upsert(indexRequest); //如果不存在,就增加indexRequest
        client.update(updateRequest).get();
    }

如果参数中的_id存在,即index/type/_id存在,那么就会执行UpdateRequest,如果index/type/_id不存在,那么就直接插入

Multi Get API

一次获取多个文档,

    /**
     * 一次获取多个文档
     */
    @Test
    public void TestMultiGetApi() {
        MultiGetResponse responses = client.prepareMultiGet()
                .add(INDEX, TYPE, "AWRFv-yAro3r8sDxIpib") //一个ID的方式
                .add(INDEX, TYPE, "AWRFvA7k0udstXU4tl60", "AWRJA72Uro3r8sDxIpip")//多个ID的方式
                .add("blog", "blog", "AWG9GKCwhg1e21lmGSLH") //从另一个索引里面获取
                .get();
        for (MultiGetItemResponse itemResponse : responses) {
            GetResponse response = itemResponse.getResponse();
            if


    
 (response.isExists()) {
                String source = response.getSourceAsString(); //_source
                JSONObject jsonObject = JSON.parseObject(source);
                Set<String> sets = jsonObject.keySet();
                for (String str : sets) {
                    System.out.println("key -> " + str);
                    System.out.println("value -> "+jsonObject.get(str));
                    System.out.println("===============");
                }
            }
        }
    }

Bulk API

Buli API 可以实现批量插入

    /**
     * 批量插入
     */
    @Test
    public void testBulkApi() throws IOException {
        BulkRequestBuilder requestBuilder = client.prepareBulk();
        requestBuilder.add(client.prepareIndex(INDEX, TYPE, "1")
            .setSource(jsonBuilder()
                .startObject()
                    .field("user", "张三")
                    .field("postDate", "2018-05-01")
                    .field("message", "zhangSan message")
                .endObject()));
        requestBuilder.add(client.prepareIndex(INDEX, TYPE, "2")
            .setSource(jsonBuilder()
                .startObject()
                    .field("user", "李四")
                    .field("postDate", "2016-09-10")
                    .field("message", "Lisi message")
                .endObject()));
        BulkResponse bulkResponse = requestBuilder.get();
        if (bulkResponse.hasFailures()) {
            System.out.println("error");
        }
    }

Useing Bulk Processor

使用Bulk Processor,Bulk Processor提供了一个简单的接口,在给定的大小的数量上定时批量自动请求

创建Bulk Processor实例

首先创建Bulk Processor实例

    /**
     * 创建Processor实例
     */
    @Test
    public void testCreateBulkProcessor() {
        BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
            //调用Bulk之前执行,例如可以通过request.numberOfActions()方法知道numberOfActions
            public void beforeBulk(long l, BulkRequest request) {
                
            }

            //调用Bulk之后执行,例如可以通过response.hasFailures()方法知道是否执行失败
            public void afterBulk(long l, BulkRequest request, BulkResponse response) {
                
            }

            //调用失败抛出throwable
            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {

            }
        }).setBulkActions(10000) //每次10000个请求
          .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) //拆成5MB一块
          .setFlushInterval(TimeValue.timeValueSeconds(5))//无论请求数量多少,每5秒钟请求一次
          .setConcurrentRequests(1)//设置并发请求的数量。值为0意味着只允许执行一个请求。值为1意味着允许1并发请求
          .setBackoffPolicy(
                  BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                //设置自定义重复请求机制,最开始等待100毫秒,之后成倍增加,重试3次,当一次或者多次重复请求失败后因为计算资源不够抛出EsRejectedExecutionException
                // 异常,可以通过BackoffPolicy.noBackoff()方法关闭重试机制
          .build();
    }

BulkProcess默认设计

  • bulkActions 1000
  • bulkSize 5mb
  • 不设置flushInterval
  • concurrentRequests为1,异步执行
  • backoffPolicy重试8次,等待50毫秒
    /**
     * 创建Processor实例
     */
    @Test
    public void testCreateBulkProcessor() throws IOException {
        BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
            //调用Bulk之前执行,例如可以通过request.numberOfActions()方法知道numberOfActions
            public void beforeBulk(long l, BulkRequest request) {

            }

            //调用Bulk之后执行,例如可以通过response.hasFailures()方法知道是否执行失败
            public void afterBulk(long l, BulkRequest request, BulkResponse response) {

            }

            //调用失败抛出throwable
            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {

            }
        }).setBulkActions(10000) //每次10000个请求
          .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) //拆成5MB一块
          .setFlushInterval(TimeValue.timeValueSeconds(5))//无论请求数量多少,每5秒钟请求一次
          .setConcurrentRequests(1)//设置并发请求的数量。值为0意味着只允许执行一个请求。值为1意味着允许1并发请求
          .setBackoffPolicy(
                  BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                //设置自定义重复请求机制,最开始等待100毫秒,之后成倍增加,重试3次,当一次或者多次重复请求失败后因为计算资源不够抛出EsRejectedExecutionException
                // 异常,可以通过BackoffPolicy.noBackoff()方法关闭重试机制
          .build();

        //增加requests
        bulkProcessor.add(new IndexRequest(INDEX, TYPE, "3").source(
                jsonBuilder()
                        .startObject()
                            .field("user", "王五")
                            .field("postDate", "2019-10-05")
                            .field("message", "wangwu message")
                        .endObject()));
        bulkProcessor.add(new DeleteRequest(INDEX, TYPE, "1"));
        bulkProcessor.flush();
        //关闭bulkProcessor
        bulkProcessor.close();
        client.admin().indices().prepareRefresh().get();
        client.prepareSearch().get();
    }

Search API

搜索API可以支持搜索查询,返回查询匹配的结果,它可以搜索一个index/type或者多个index/type,可以使用Query Java API 作为查询条件

Java 默认提供QUERY_AND_FETCH和DFS_QUERY_AND_FETCH两种search Types,但是这种模式应该由系统选择,而不是用户手动指定

​ 实例

    @Test
    public void testSearchApi


    
() {
        SearchResponse response = client.prepareSearch(INDEX).setTypes(TYPE)
                .setQuery(QueryBuilders.matchQuery("user", "hhh")).get();
        SearchHit[] hits = response.getHits().getHits();
        for (int i = 0; i < hits.length; i++) {
            String json = hits[i].getSourceAsString();
            JSONObject object = JSON.parseObject(json);
            Set<String> strings = object.keySet();
            for (String str : strings) {
                System.out.println(object.get(str));
            }
        }
    }

Using scrolls in Java

一般的搜索请求都时返回一页的数据,无论多大的数据量都会返回给用户,Scrolls API 可以允许我们检索大量的数据(甚至是全部数据)。Scroll API允许我们做一个初始阶段搜索页并且持续批量从ElasticSearch里面拉去结果知道结果没有剩下。Scroll API的创建并不是为了实时的用户响应,而是为了处理大量的数据。

  /**
   * 滚动查询 
   * @throws ExecutionException
   * @throws InterruptedException
   */
  @Test
  public void testScrollApi() throws ExecutionException, InterruptedException {
      MatchQueryBuilder qb = matchQuery("user", "hhh");
      SearchResponse response = client.prepareSearch(INDEX).addSort(FieldSortBuilder.DOC_FIELD_NAME,
              SortOrder.ASC)
              .setScroll(new TimeValue(60000)) //为了使用scroll,初始搜索请求应该在查询中指定scroll参数,告诉ElasticSearch需要保持搜索的上下文环境多长时间
              .setQuery(qb)
              .setSize(100).get();
      do {
          for (SearchHit hit : response.getHits().getHits()) {
              String json = hit.getSourceAsString();
              JSONObject object = JSON.parseObject(json);
              Set<String> strings = object.keySet();
              for (String str : strings) {
                  System.out.println(object.get(str));
              }
          }
          response = client.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000)).execute().get();
      } while (response.getHits().getHits().length != 0);
  }

如果超过滚动时间,继续使用该滚动ID搜索数据,则会报错

虽然滚动时间已过,搜索上下文会自动被清除,但是一直保持滚动代价会很大,所以当我们不在使用滚动时要尽快使用Clear-Scroll API进行清除。

清除滚动ID

        ClearScrollRequestBuilder clearBuilder = client.prepareClearScroll();
        clearBuilder.addScrollId(response.getScrollId());
        ClearScrollResponse scrollResponse = clearBuilder.get();
        System.out.println("是否清楚成功:"+scrollResponse.isSucceeded());

MultiSearch API

MultiSearch API 允许在同一个API中执行多个搜索请求。它的端点是_msearch

    @Test
    public void testMultiSearchApi() {
        SearchRequestBuilder srb1 = client.prepareSearch().setQuery(QueryBuilders.queryStringQuery("elasticsearch")).setSize(1);
        SearchRequestBuilder srb2 = client.prepareSearch().setQuery(QueryBuilders.matchQuery("user", "hhh")).setSize(1);
        MultiSearchResponse multiSearchResponse = client.prepareMultiSearch().add(srb1).add(srb2).get();
        long nbHits = 0;
        for (MultiSearchResponse.Item item : multiSearchResponse.getResponses()) {
            SearchResponse response = item.getResponse();
            nbHits += response.getHits().getTotalHits();
        }
        System.out.println(nbHits);
    }

Using Aggregations

聚合框架有助于根据搜索查询提供数据。它是基于简单的构建块也称为整合,整合就是将复杂的数据摘要有序的放在一块。聚合可以被看做是从一组文件中获取分析信息的一系列工作的统称。聚合的实现过程就是定义这个文档集的过程

    @Test
    public void testAggregations() {
        SearchResponse searchResponse = client.prepareSearch()
                .setQuery(QueryBuilders.matchAllQuery())
                .addAggregation(AggregationBuilders.terms("LZH").field("user"))
                .addAggregation(AggregationBuilders.dateHistogram("2013-01-30").field("postDate")
                        .dateHistogramInterval(DateHistogramInterval.YEAR)).get();
        Terms lzh = searchResponse.getAggregations().get("user");
        Histogram postDate = searchResponse.getAggregations().get("2013-01-30");

    }

Terminate After

获取文档的最大数量,如果设置了,需要通过SearchResponse对象里面的isTerminatedEarly()判断返回文档是否达到设置的数量

    @Test
    public void TestTerminate() {
        SearchResponse searchResponse = client.prepareSearch(INDEX)
                .setTerminateAfter(2) //如果达到这个数量,提前终止
                .get();
        if (searchResponse.isTerminatedEarly()) {
            System.out.println(searchResponse.getHits().totalHits);
        }
    }

Aggregations

聚合。ElasticSearch提供完整的Java API来使用聚合。使用AggregationBuilders构建对象,增加到搜索请求中。

SearchResponse response = client.prepareSearch().setQuery(/*查询*/).addAggregation(/*聚合*/).execute().actionGet();

Structuring aggregations

结构化聚合。

Metrics aggregations

在计算度量类的这类聚合操作是以使用一种方式或者从文档中提取需要聚合的值为基础。

在这中间主要使用的类是**AggregationBuilders**,这里面包含了大量的一下的聚合方法调用,直接使用即可

Min Aggregation最小聚合

    MinAggregationBuilder aggregation = AggregationBuilders.min("agg").field("age");

    SearchResponse sr = client.prepareSearch("twitter").addAggregation(aggregation).get();
    Min agg = sr.getAggregations().get("agg");
    String value = agg.getValueAsString();//这个统计的是日期,一般用下面方法获得最小值
    System.out.println("min value:" + value);

debug模式下

第一行MinAggregationBuilder的toString()执行的内容如下

{
  "error": "JsonGenerationException[Can not write a field name, expecting a value]"
}
SearchResponse sr = client.prepareSearch("twitter").addAggregation(aggregation).get();

在SearchResponse的toString()的内容如下, 这个内容就是查询的JSON结果,这里面的JSON结果的结构与SearchResponse的API操作相配套使用可以获取到里面的每一个值。

{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 4,
    "max_score": 1.0,
    "hits": [
      {
        "_index": "twitter",
        "_type": 


    
"tweet",
        "_id": "10",
        "_score": 1.0,
        "_source": {
          "user": "kimchy",
          "postDate": "2018-06-29T09:10:21.396Z",
          "age": 30,
          "gender": "female",
          "message": "trying out Elasticsearch"
        }
      },
      {
        "_index": "twitter",
        "_type": "tweet",
        "_id": "2",
        "_score": 1.0,
        "_source": {
          "user": "kimchy",
          "postDate": "2018-06-29T09:05:33.943Z",
          "age": 20,
          "gender": "female",
          "message": "trying out Elasticsearch"
        }
      },
      {
        "_index": "twitter",
        "_type": "tweet",
        "_id": "1",
        "_score": 1.0,
        "_source": {
          "user": "kimchy",
          "postDate": "2018-06-29T08:59:00.191Z",
          "age": 10,
          "gender": "male",
          "message": "trying out Elasticsearch"
        }
      },
      {
        "_index": "twitter",
        "_type": "tweet",
        "_id": "11",
        "_score": 1.0,
        "_source": {
          "user": "kimchy",
          "postDate": "2018-06-29T09:10:54.386Z",
          "age": 30,
          "gender": "female",
          "message": "trying out Elasticsearch"
        }
      }
    ]
  },
  "aggregations": {
    "agg": {
      "value": 10.0
    }
  }
}

通过观察可以发现sr.getAggregations().get("agg");方法就是获取其中的聚合统计的数据,其中整个代码中的参数agg可以自定义

Max Aggregation最大聚合

    MaxAggregationBuilder aggregation = AggregationBuilders.max("agg").field("readSize");

    SearchResponse sr = client.prepareSearch("blog").addAggregation(aggregation).get();
    Max agg = sr.getAggregations().get("agg");
    String value = agg.getValueAsString();

    System.out.println("max value:" + value);

具体分析方法如Min Aggregation聚合一样,但是不能统计出是哪一条数据的最大最小值

Sum Aggregation求和聚合

    SumAggregationBuilder aggregation = AggregationBuilders.sum("agg").field("readSize");

    SearchResponse sr = client.prepareSearch("blog").addAggregation(aggregation).get();
    Sum agg = sr.getAggregations().get("agg");
    String value = agg.getValueAsString();

    System.out.println("sum value:" + value);

Avg Aggregation平均值聚合

AvgAggregationBuilder aggregation = AggregationBuilders.avg("agg").field("age");
SearchResponse searchResponse = client.prepareSearch("twitter").addAggregation(aggregation).get();
Avg avg = searchResponse.getAggregations().get("agg");
String value = avg.getValueAsString();
System.out.println("avg value: "+ value);

Stats Aggreagtin统计聚合

统计聚合——基于文档的某个值,计算出一些统计信息(min、max、sum、count、avg), 用于计算的值可以是特定的数值型字段,也可以通过脚本计算而来。

        StatsAggregationBuilder aggregation = AggregationBuilders.stats("agg").field("age");
        SearchResponse searchResponse = client.prepareSearch("twitter").addAggregation(aggregation).get();
        Stats stats = searchResponse.getAggregations().get("agg");
        String max = stats.getMaxAsString();
        String min = stats.getMinAsString();
        String avg = stats.getAvgAsString();
        String sum = stats.getSumAsString();
        long count = stats.getCount();
        System.out.println("max value: "+max);
        System.out.println("min value: "+min);
        System.out.println("avg value: "+avg);
        System.out.println("sum value: "+sum);
        System.out.println("count value: "+count);

这个聚合统计可以统计出上面的平常的统计值。当需要统计上面的大部分的值时,可以使用这种方式

Extended Stats Aggregation扩展统计聚合

扩展统计聚合——基于文档的某个值,计算出一些统计信息(比普通的stats聚合多了sum_of_squares、variance、std_deviation、std_deviation_bounds),用于计算的值可以是特定的数值型字段,也可以通过脚本计算而来。主要的结果值就是最大、最小、方差、平方差等统计值

        ExtendedStatsAggregationBuilder aggregation = AggregationBuilders.extendedStats("agg").field("age");
        SearchResponse response = client.prepareSearch("twitter").addAggregation(aggregation).get();
        ExtendedStats extended = response.getAggregations().get("agg");
        String max = extended.getMaxAsString();
        String min = extended.getMinAsString();
        String avg = extended.getAvgAsString();
        String sum = extended.getSumAsString();
        long count = extended.getCount();
        double stdDeviation = extended.getStdDeviation();
        double sumOfSquares = extended.getSumOfSquares();
        double variance = extended.getVariance();
        System.out.println("max value: " +max);
        System.out.println("min value: " +min);
        System.out.println("avg value: " +avg);
        System.out.println("sum value: " +sum);
        System.out.println("count value: " +count);
        System.out.println("stdDeviation value: " +stdDeviation);
        System.out.println("sumOfSquares value: " +sumOfSquares);
        System.out.println("variance value: "+variance);

Value Count Aggregation值计数聚合

值计数聚合——计算聚合文档中某个值的个数, 用于计算的值可以是特定的数值型字段,也可以通过脚本计算而来。该聚合一般域其它 single-value 聚合联合使用,比如在计算一个字段的平均值的时候,可能还会关注这个平均值是由多少个值计算而来。

ValueCountAggregationBuilder aggregation = AggregationBuilders.count("agg").field("age");
SearchResponse response = client.prepareSearch("twitter").addAggregation(aggregation).get();
ValueCount count = response.getAggregations().get("agg");
long value = count.getValue();
System.out.println("ValueCount value: " +value);

Precentile Aggregation百分百聚合




    
    PercentilesAggregationBuilder aggregation = AggregationBuilders.percentiles("agg").field("age");
    SearchResponse response = client.prepareSearch("twitter").addAggregation(aggregation).get();
    Percentiles agg = response.getAggregations().get("agg");
    for (Percentile entry : agg) {
        double percent = entry.getPercent();
        double value = entry.getValue();
        System.out.println("percent value: " + percent + "value value: " + value);
    }

Cardinality Aggreagion基数聚合

去除重复的个数的基数

CardinalityAggregationBuilder aggregation = AggregationBuilders.cardinality("agg").field("age");
SearchResponse response = client.prepareSearch("twitter").addAggregation(aggregation).get();
Cardinality agg = response.getAggregations().get("agg");
long value = agg.getValue();
System.out.println("value value: "+ value);

Top Hits Aggregation最高匹配权值聚合

查询出匹配的文档的字段的个数

TermsAggregationBuilder aggregation = AggregationBuilders.terms("agg").field("gender.keyword")
.subAggregation(AggregationBuilders.topHits("top").explain(true).size(1).from(10));
SearchResponse response = client.prepareSearch("twitter").addAggregation(aggregation).get();
Terms agg = response.getAggregations().get("agg");
        for (Terms.Bucket bucket : agg.getBuckets()) {
            String key = (String) bucket.getKey();
            long docCount = bucket.getDocCount();
            System.out.println("key value: " + key + " docCount value: " + docCount);
            TopHits topHits = bucket.getAggregations().get("top");
            for (SearchHit searchHitFields : topHits.getHits().getHits()) {
                System.out.println("id value: " + searchHitFields.getId() + " source value: " + searchHitFields.getSourceAsString());
            }
        }

Bucket aggregations

Global Aggregation全局聚合

查询全局的一个数量统计

        AggregationBuilder aggregation = AggregationBuilders
                .global("agg")
                .subAggregation(
                        AggregationBuilders.terms("users").field("user.keyword")
                );

        SearchResponse sr = client.prepareSearch("twitter")
                .addAggregation(aggregation)
                .get();
        System.out.println(sr);
        Global agg = sr.getAggregations().get("agg");
        long count = agg.getDocCount(); // Doc count

        System.out.println("global count:" + count);

Filter Aggreagion过滤聚合

过滤统计

AggregationBuilder aggregation = AggregationBuilders.filters("aaa", new FiltersAggregator.KeyedFilter("men", QueryBuilders.termQuery("gender", "male")));

SearchResponse sr = client.prepareSearch("twitter").setTypes("tweet").addAggregation(aggregation).get();
Filters agg = sr.getAggregations().get("aaa");
for (Filters.Bucket entry : agg.getBuckets()) {
  String key = entry.getKeyAsString();            // bucket key
  long docCount = entry.getDocCount();            // Doc count

  System.out.println("global " + key + " count:" + docCount);
}

Filters Aggregation多过滤聚合

多个条件过滤,查询出个数

AggregationBuilder aggregation = AggregationBuilders.filters("aaa",new FiltersAggregator.KeyedFilter("men", QueryBuilders.termQuery("gender", "male")),new FiltersAggregator.KeyedFilter("women", QueryBuilders.termQuery("gender", "female")));

SearchResponse sr = client.prepareSearch("twitter").setTypes("tweet").addAggregation(aggregation).get();
Filters agg = sr.getAggregations().get("aaa");
for (Filters.Bucket entry : agg.getBuckets()) {
  String key = entry.getKeyAsString();            // bucket key
  long docCount = entry.getDocCount();            // Doc count

  System.out.println("global " + key + " count:" + docCount);
}

Missing Aggregation基于字段数据的单桶聚合

Nested Aggregation嵌套类型聚合

Reverse nested Aggregation

Children Aggregation

Terms Aggregation词元聚合

        TermsAggregationBuilder fieldAggregation = AggregationBuilders.terms("genders").field("gender.keyword")
                .order(Terms.Order.term(true));
        SearchResponse response = client.prepareSearch("twitter").setTypes("tweet").addAggregation(fieldAggregation).get();

        Terms terms = response.getAggregations().get("genders");
        for (Terms.Bucket bucket : terms.getBuckets()) {
            System.out.println("key value: " + bucket.getKey());
            System.out.println("docCount value: " + bucket.getDocCount());
        }

Order排序

        TermsAggregationBuilder fieldAggregation = AggregationBuilders.terms("genders").field("gender.keyword")
                .order(Terms.Order.term(true));

Significant Terms Aggregation

Range Aggregation范围聚合

Date Range Aggregation日期聚合

Ip Range Aggregation Ip范围聚合

Histogram Aggregation直方图聚合

Date Histogram Aggregation日期范围直方图聚合

Geo Distance Aggregation地理距离聚合

Geo Hash Grid Aggregation GeoHash网格聚合

Query DSL

Match All Query

匹配所有文档

QueryBuilder qb = matchAllQuery();

Full text Query

match Query 匹配查询

模糊匹配和字段词组查询

QueryBuilder qb = matchQuery("gender", "female");

multi_mathc query 多字段查询

多个字段进行查询,字段可以有多个

QueryBuilder qb = multiMatchQuery("female","gender", "message");

common_terms query常用术语查询

对一些比较专业的偏门词语进行更加专业的查询




    
QueryBuilder qb = commonTermsQuery("gender","female");

query_string query查询语句查询

一种与Lucene查询语法结合的查询,允许使用特殊条件去查询(AND|OR|NOT)

QueryBuilder qb = queryStringQuery("+male -female");

simple_string query简单查询语句

一种简单的查询语法

QueryBuilder qb = queryStringQuery("+male -female");

Term level Query

Term Query项查询

在指定字段中查询确切的值的文档

QueryBuilder qb = termQuery("gender","male");

Terms Query多项查询

查询一个字段内的多个确切的值

QueryBuilder qb = termsQuery("age","10", "20");

Range Query范围查询

范围查询

  • gte():范围查询将匹配字段值大于或等于此参数值的文档
  • gt():范围查询将匹配字段值大于此参数值的文档
  • lte():范围查询将匹配字段值小于或等于此参数值的文档
  • lt():范围查询将匹配字段值小于此参数值的文档
  • from()开始值to()结果值,这两个函数与includeLower()和includeUpper()函数配套使用
  • includeLower(true)表示from()查询将匹配字段值大于或等于此参数值的文档
  • includeLower(false)表示from()查询将匹配字段值大于此参数值的文档
  • includeUpper(true)表示to()查询将匹配字段值小于或等于此参数值的文档
  • includeUpper(false)表示to()查询将匹配字段值小于此参数值的文档
QueryBuilder qb = QueryBuilders.rangeQuery("age").gte(10).includeLower(true).lte(20).includeUpper(true);

其中,includeLower()和includeUpper()方法表示这个范围是否包含查询

Exists Query存在查询

根据指定的字段名查询是否存在

QueryBuilder qb = existsQuery("user");

Prefix Query前缀查询

根据指定字段名和指定精确前缀进行查询

QueryBuilder qb = prefixQuery("gender","m");

Wildcard Query通配符查询

通配符查询,指定字段名和通配符。其中?表示单字符通配符,*表示多字符通配符。通配符查询的字段都是未经过分析的字段

QueryBuilder qb = wildcardQuery("gender","f?*");

Regexp Query正则表达式查询

根据指定字段名和正则表达式进行查询。查询的字段也是未经过分析的字段

QueryBuilder qb = regexpQuery("gender","f.*");

Fuzzy Query模糊查询

模糊查询:指定的确切的字段名和拼写错误的查询内容

QueryBuilder qb = fuzzyQuery("gender","mala").fuzziness(Fuzziness.ONE);

Type Query类型查询

查询指定类型的文档

QueryBuilder qb = typeQuery("tweet");

Ids Query ID查询

根据type类型和ID查询,type类型可以不写

QueryBuilder qb = idsQuery("tweet").addIds("1", "11");

今天看啥 - 高品质阅读平台
本文地址:http://www.jintiankansha.me/t/aWEeWlAgnw
Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/20288
 
358 次点击