社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Elasticsearch

ElasticSearch 使用 High Level REST Client 实现搜索等功能实战

程序员大咖 • 3 年前 • 277 次点击  
👇👇关注后回复 “进群” ,拉你进程序员交流群👇👇
作者丨idea
来源丨Java知音(ID:Java_friends)


ES 全称 Elasticsearch 是一款分布式的全文搜索引擎,在互联网公司中,这款搜索引擎一直被程序员们所推崇。常见的使用场景如ELK日志分析,电商APP的商品推荐,社交APP的同城用户推荐等等。

在ES的官网文档中,目前主要提供了两种方式访问,一种叫做Low Client,一种叫做High Level Rest Client。在今天这篇文章中,我们主要介绍High Level Rest Client的使用方式和一些经验分享。

ES操作记录

那么我们该如何去通过High Level Rest Client的方式来使用es呢?来看接下来的这块实战案例。

首先我们需要合理的es配置依赖,下边这份是对应的pom文件配置:

<dependency>
    <groupId>org.elasticsearch.clientgroupId>
    <artifactId>elasticsearch-rest-high-level-clientartifactId>
    <version>5.6.11version>
dependency>
<dependency>
    <groupId>org.elasticsearchgroupId>
    <artifactId>elasticsearchartifactId>
    <version>5.6.11version>
dependency>

在配置中指定了es依赖之后,我们开始定义一个用于测试es增删查改操作的对象类UserSearchRecordPO。

@EsDeclare(index = "user_search")
public class UserSearchRecordPO {
    @Id
    private long id;
    private String username;
    private String searchKeyWord;
    public long getId() {
        return id;
    }
     public void setId(long id) {
        this.id = id;
    }
    public String getUsername() {
        return username;
    }
    public void setUsername(String username) {
        this.username = username;
    }
    public String getSearchKeyWord() {
        return searchKeyWord;
    }
    public void setSearchKeyWord(String searchKeyWord) {
        this.searchKeyWord = searchKeyWord;
    }
}

在UserSearchRecordPO这个对象的头部我用了一个自定义的注解:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface EsDeclare {


    String index() default StringUtils.EMPTY;
}

这个注解用于声明对象所映射的文档具体名称。

奇怪,为什么我们要声明这个注解呢?嘿嘿,别着急,在下边的这个EsDao中就有使用到这个注解的影子了。

在ESDao中,我的整体设计思路是,通过反射获取一个Bean对象是否携带有@EsDeclare注解,如果有,就从注解中提取对应的topic。这部分的核心逻辑如下所示:

/**
 * 获取topic和type
 *
 * @param clz
 * @return
 */

private Pair/* topic */, String/* type */> getTopicAndType(Class> clz) {
    //通过反射去获取注解中的index值
    EsDeclare esDeclare = clz.getAnnotation(EsDeclare.class);
    if (null == esDeclare || StringUtils.isEmpty(esDeclare.index())) {
        logger.warn("getTopicAndType , esDeclare is illegal , class:{}", clz);
        return null;
    }
    return Pair.of(esDeclare.index(), clz.getSimpleName());
}

这里有几个概念需要和大家简单梳理下,关于index,type,document三个概念的含义:

  • index可以类比为MySQL中的表这个概念,他是一类型数据存储的集合。
  • document其实就是index这个集合里面单条数据的一种称呼,这个概念和MySQL中的行记录比较类似。
  • type是这个代表document属于index中的哪个类别(type),一个index通常会划分为多个type,逻辑上对index中有些许不同的几类数据进行分类:因为一批相同的数据,可能有很多相同的fields,但是还是可能会有一些轻微的不同,可能会有少数fields是不一样的,举个例子,就比如说,商品,可能划分为电子商品,生鲜商品,日化商品,等等。

三者的关系如下图所示:

好了,现在让我们再来看看基于ES进行CRUD该如何执行操作,具体代码见下边这个类:

package org.idea.es.project.template.api.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.parser.Feature;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.idea.es.project.template.api.config.EsDeclare;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.annotation.Id;
import org.springframework.stereotype.Repository;
import javax.naming.directory.SearchResult;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.List;
@Repository
public class EsDao<T{
    private final Logger logger = LoggerFactory.getLogger(getClass());
    @Autowired
    private RestHighLevelClient restHighLevelClient;


    /**
     * 条件查询
     *
     * @return
     */

    public List searchByCondition(String index, String type) {
        try {
            SearchRequest searchRequest = new SearchRequest(index).types(type);
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            sourceBuilder.from(1).size(2);
            searchRequest.source(sourceBuilder);
            SearchResponse searchResponse = restHighLevelClient.search(searchRequest);
            System.out.println(searchResponse);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }


    public T queryOne(SearchSourceBuilder sourceBuilder, Class clz) {
        List resultList = this.queryList(sourceBuilder, clz);
        if (CollectionUtils.isNotEmpty(resultList)) {
            return resultList.get(0);
        }
        return null;
    }
    /**
     * 查询
     *
     * @param sourceBuilder
     * @param clz
     * @return
     */

    public List queryList(SearchSourceBuilder sourceBuilder, Class clz) {
        Pair topicAndType = getTopicAndType(clz);
        if (null == topicAndType) {
            logger.warn("query , null topicAndType , clz:{}", clz);
            return Collections.emptyList();
        }
        Field idField = getIdField(clz);
        if (null == idField) {
            logger.warn("query , null id field , clz:{}", clz);
            return Collections.emptyList();
        }
        try {
            SearchRequest searchRequest = new SearchRequest(topicAndType.getLeft()).types(topicAndType.getRight()).source(sourceBuilder);
            SearchResponse searchResponse = restHighLevelClient.search(searchRequest);
            SearchHit[] hits = searchResponse.getHits().getHits();
            List result = Lists.newArrayListWithCapacity(hits.length);
            for (SearchHit hit : hits) {
                T obj = JSON.parseObject(hit.getSourceAsString(), clz, Feature.AllowISO8601DateFormat);
                Object idObj = FieldUtils.readField(idField, obj, true);
                if (null == idObj) {
                    FieldUtils.writeField(idField, obj, hit.getId(), true);
                }
                result.add(obj);
            }
            return result;
        } catch (Exception e) {
            logger.warn("query , e:{}", e.getMessage());
            return Collections.emptyList();
        }
    }
    /**
     * 插入或者更新,根据id字段来判断是否已有数据
     *
     * @param po
     */

    public void saveOrUpdate(T po) {
        if (po == null) {
            throw new IllegalArgumentException("po can not be null!");
        }
        try {
            Pair/*topic*/, String /*type*/> pair = getTopicAndType(po.getClass());
            Field idField = getIdField(po.getClass());
            idField.setAccessible(true);
            Object idObj = idField.get(po);
            IndexRequest indexRequest = new IndexRequest(pair.getLeft(), pair.getRight(), idObj == null ? null : idObj.toString());
            IndexResponse indexResponse = restHighLevelClient.index(indexRequest.source(JSON.toJSONStringWithDateFormat(po, "yyyy-MM-dd'T'HH:mm:ss+08:00" ), XContentType.JSON));
            System.out.println(indexResponse);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * 删除单个元素
     *
     * @param po
     */

    public void deleteOne(T po) {
        if (po == null) {
            throw new IllegalArgumentException("po can not be null!");
        }
        try {
            Pair/*index*/, String /*type*/> pair = getTopicAndType(po.getClass());
            Field idField = getIdField(po.getClass());
            idField.setAccessible(true);
            Object idObj = idField.get(po);
            DeleteRequest deleteRequest = new DeleteRequest(pair.getLeft(), pair.getRight(), idObj == null ? null : idObj.toString());
            restHighLevelClient.delete(deleteRequest);
        } catch (IllegalAccessException | IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * 根据id删除
     *
     * @param index
     * @param type
     * @param _id
     */

    public void deleteBy_Id(String index,String type,String _id){
        DeleteRequest deleteRequest = new DeleteRequest(index, type, _id);
        try {
            restHighLevelClient.delete(deleteRequest);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    /**
     * 获取id的域
     *
     * @param clz
     * @return
     */

    public Field getIdField(Class> clz) {
        List listWithAnnotation = FieldUtils.getFieldsListWithAnnotation(clz, Id.class);
        if (listWithAnnotation.size() != 1) {
            logger.warn("getIdField , id is illeage , class:{}", clz);
            return null;
        }
        return listWithAnnotation.get(0);
    }
    /**
     * 获取topic和type
     *
     * @param clz
     * @return
     */

    private Pair/* topic */, String/* type */> getTopicAndType(Class> clz) {
        EsDeclare esDeclare = clz.getAnnotation(EsDeclare.class);
        if (null == esDeclare || StringUtils.isEmpty(esDeclare.index())) {
            logger.warn("getTopicAndType , esDeclare is illegal , class:{}", clz);
            return null;
        }
        return Pair.of(esDeclare.index(), clz.getSimpleName());
    }
}

这里需要注意下saveOrUpdate函数中,它会根据传入的对象参数中带有 @Id 注解的字段值去判断是否已经有具体数据,如果有的话则只做更新操作,反之就是插入操作。这一点就有点类似于MySQL的insertOrUpdate方法。

接下来就是对于我们所定义的对象实现crud操作了,下边是对应的service接口和相关的实现类,这部分的代码如下所示:

首先是接口部分的定义:

package org.idea.es.project.template.api.service;


import org.idea.es.project.template.api.bo.UserSearchRecordPO;


import javax.naming.directory.SearchResult;
import java.util.List;


public interface IUserSearchRecordService {


    /**
     * 条件查询
     *
     * @param index
     * @param type
     * @return
     */

    List searchByCondition(String index,String type);


    /**
     * 查询操作
     *
     * @param userSearchRecordPO
     * @return
     */

    UserSearchRecordPO queryByParam(UserSearchRecordPO userSearchRecordPO);


    /**
     * 写入记录
     *
     * @return
     */

    UserSearchRecordPO saveOrUpdate();


    /**
     * 删除单个元素
     */

    void deleteOne(UserSearchRecordPO userSearchRecordPO);




}

接着是对应的service实现类部分:

package org.idea.es.project.template.api.service.impl;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.idea.es.project.template.api.bo.UserSearchRecordPO;
import org.idea.es.project.template.api.service.EsDao;
import org.idea.es.project.template.api.service.IUserSearchRecordService;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.naming.directory.SearchResult;
import java.util.List;
@Service
public class UserSearchRecordServiceImpl implements IUserSearchRecordService {
    @Resource
    private EsDao esDao;
    @Override
    public List searchByCondition(String index,String type) {
        return esDao.searchByCondition(index,type);
    }
    @Override
    public UserSearchRecordPO queryByParam(UserSearchRecordPO userSearchRecordPO) {
        try {
            BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery()
                    .filter(QueryBuilders.termQuery("id", userSearchRecordPO.getId()));
            SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().query(queryBuilder).from(0).size(1);
            return esDao.queryOne(sourceBuilder, UserSearchRecordPO.class);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
    @Override
    public UserSearchRecordPO saveOrUpdate() {
        UserSearchRecordPO userSearchRecordPO = new UserSearchRecordPO();
        userSearchRecordPO.setId(System.currentTimeMillis());
        userSearchRecordPO.setUsername("idea");
        userSearchRecordPO.setSearchKeyWord("key-word");
        esDao.saveOrUpdate(userSearchRecordPO);
        return userSearchRecordPO;
    }
    @Override
    public void deleteOne(UserSearchRecordPO userSearchRecordPO) {
        esDao.deleteOne(userSearchRecordPO);
    }
}

最后是供外界调用的controller方法:

package org.idea.es.project.template.api.controller;
import org.idea.es.project.template.api.bo.UserSearchRecordPO;
import org.idea.es.project.template.api.service.IUserSearchRecordService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(value = "/user-search-record")
public class UserSearchRecordController {
    @Autowired
    private IUserSearchRecordService iUserSearchRecordService;


    @GetMapping(value = "/save-or-update")
    public boolean saveOrUpdate(){
        iUserSearchRecordService.saveOrUpdate();
        System.out.println("success");
        return true;
    }


    @GetMapping(value = "/query-by-param")
    public UserSearchRecordPO queryByParam(Long id){
        UserSearchRecordPO userSearchRecordPO = new UserSearchRecordPO();
        userSearchRecordPO.setId(id);
        return iUserSearchRecordService.queryByParam(userSearchRecordPO);
    }


    @GetMapping (value = "/delete-one")
    public boolean deleteOne(long id){
        UserSearchRecordPO userSearchRecordPO = new UserSearchRecordPO();
        userSearchRecordPO.setId(id);
        iUserSearchRecordService.deleteOne(userSearchRecordPO);
        System.out.println("success");
        return true;
    }
}

将SpringBoot启动之后,分别触发这些http请求接口,就可以验证crud操作的正确性了。

好了。

另外,在测试es的时候,我们可以通过使用 elasticsearch-head 这款插件去查看es内部的数据是否符合我们的预期。

整体来说,通过 elasticsearch-rest-high-level-client 去访问es还是比较容易上手的。另外在实际业务场景中,如果遇到一些非常复杂的条件查询功能的话,自Elasticsearch 5.x之后,我们其实还可以通过使用painless脚本去操作es,可以看出es的功能在变得越来越强大了。


-End-

最近有一些小伙伴,让我帮忙找一些 面试题 资料,于是我翻遍了收藏的 5T 资料后,汇总整理出来,可以说是程序员面试必备!所有资料都整理到网盘了,欢迎下载!

点击👆卡片,关注后回复【面试题】即可获取

在看点这里好文分享给更多人↓↓

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/136802