社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Elasticsearch

Elasticsearch系列---Java客户端代码Demo

1黄鹰 • 5 年前 • 572 次点击  
阅读 4

Elasticsearch系列---Java客户端代码Demo

前言

前面历经33篇内容的讲解,与ES的请求操作都是在Kibana平台上用Restful请求完成的,一直没发布Java或python的客户端代码,Restful才是运用、理解ES核心功能最直接的表达方式,但实际项目中肯定是以Java/python来完成ES请求的发起与数据处理的,前面理解了ES的核心功能,后面Java API的使用将会非常简单,剩余的未覆盖的功能API,自行查阅文档即可。

概要

本篇讲解Elasticsearch的客户端API开发的一些示例,以Java语言为主,介绍一些最常用,最核心的API。

代码示例

引入依赖

我们以maven项目为例,添加项目依赖

<dependency>
	<groupId>org.elasticsearch</groupId>
	<artifactId>elasticsearch</artifactId>
	<version>6.3.1</version>
</dependency>
<dependency>
	<groupId>org.elasticsearch.client</groupId>
	<artifactId>transport</artifactId>
	<version>6.3.1</version>
</dependency>
<dependency>
	<groupId>log4j</groupId>
	<artifactId>log4j</artifactId>
	<version>1.2.17</version>
</dependency>
<dependency>
	<groupId>org.apache.logging.log4j</groupId>
	<artifactId>log4j-core</artifactId>
	<version>2.12.1</version>
</dependency>
复制代码

建立ES连接

  1. 创建Settings对象,指定集群名称
  2. 创建TransportClient对象,手动指定IP、端口即可
Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build();
		
TransportClient client = new PreBuiltTransportClient(settings).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));
复制代码

如果集群的节点数比较多,为每个node分别指定IP、Port可行性不高,我们可以使用集群节点自动探查的功能,代码如下:

// 将client.transport.sniff设置为true即可打开集群节点自动探查功能
Settings settings = Settings.builder().put("client.transport.sniff", true)..put("cluster.name", "elasticsearch").build();

// 只需要指定一个node就行
TransportClient client = new PreBuiltTransportClient(settings);
transport.addTransportAddress(new TransportAddress(InetAddress.getByName("192.168.17.137"), 9300));

复制代码

基本CRUD

最基本的CRUD代码,可以当作入门demo来写:

/**
	 * 创建员工信息(创建一个document)
	 * @param client
	 */
	private static void createEmployee(TransportClient client) throws Exception {
		IndexResponse response = client.prepareIndex("company", "employee", "1")
				.setSource(XContentFactory.jsonBuilder()
						.startObject()
							.field("name", "jack")
							.field("age", 27)
							.field("position", "technique")
							.field("country", "china")
							.field("join_date", "2017-01-01")
							.field("salary", 10000)
						.endObject())
				.get();
		System.out.println(response.getResult()); 
	}
	
	/**
	 * 获取员工信息
	 * @param client
	 * @throws Exception
	 */
	private static void getEmployee(TransportClient client) throws Exception {
		GetResponse response = client.prepareGet("company", "employee", "1").get();
		System.out.println(response.getSourceAsString()); 
	}
	
	/**
	 * 修改员工信息
	 * @param client
	 * @throws Exception
	 */
	private static void updateEmployee(TransportClient client) throws Exception {
		UpdateResponse response = client.prepareUpdate("company", "employee", "1") 
				.setDoc(XContentFactory.jsonBuilder()
							.startObject()
								.field("position", "technique manager")
							.endObject())
				.get();
		System.out.println(response.getResult());  
 	}
	
	/**
	 * 删除 员工信息
	 * @param client
	 * @throws Exception
	 */
	private static void deleteEmployee(TransportClient client) throws Exception {
		DeleteResponse response = client.prepareDelete("company", "employee", "1").get();
		System.out.println(response.getResult());  
	}
复制代码

搜索

我们之前使用Restful的搜索,现在改用java实现,原有的Restful示例如下:

GET /company/employee/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "position": "technique"
          }
        }
      ],
      "filter": {
        "range": {
          "age": {
            "gte": 30,
            "lte": 40
          }
        }
      }
    }
  },
  "from": 0,
  "size": 1
}
复制代码

等同于这样的Java代码:

SearchResponse response = client.prepareSearch("company")
        .setTypes("employee"


    
)
        .setQuery(QueryBuilders.termQuery("position", "technique"))                 // Query
        .setPostFilter(QueryBuilders.rangeQuery("age").from(30).to(40))     // Filter
        .setFrom(0).setSize(60)
        .get();
复制代码

聚合查询

聚合查询稍微麻烦一些,请求的封装和响应报文的解析,都是根据实际返回的结构来做的,例如下面的查询:

需求:

  1. 按照country国家来进行分组
  2. 在每个country分组内,再按照入职年限进行分组
  3. 最后计算每个分组内的平均薪资

Restful的请求如下:

GET /company/employee/_search
{
  "size": 0,
  "aggs": {
    "group_by_country": {
      "terms": {
        "field": "country"
      },
      "aggs": {
        "group_by_join_date": {
          "date_histogram": {
            "field": "join_date",
            "interval": "year"
          },
          "aggs": {
            "avg_salary": {
              "avg": {
                "field": "salary"
              }
            }
          }
        }
      }
    }
  }
}
复制代码

用Java编写的请求如下:

SearchResponse sr = node.client().prepareSearch()
    .addAggregation(
        AggregationBuilders.terms("by_country").field("country")
        .subAggregation(AggregationBuilders.dateHistogram("by_year")
            .field("dateOfBirth")
            .dateHistogramInterval(DateHistogramInterval.YEAR)
            .subAggregation(AggregationBuilders.avg("avg_children").field("children"))
        )
    )
    .execute().actionGet();
复制代码

对响应的处理,则需要一层一层获取数据:

Map<String, Aggregation> aggrMap = searchResponse.getAggregations().asMap();
	StringTerms groupByCountry = (StringTerms) aggrMap.get("group_by_country");
	Iterator<Bucket> groupByCountryBucketIterator = groupByCountry.getBuckets().iterator();
	
	while(groupByCountryBucketIterator.hasNext()) {
		Bucket groupByCountryBucket = groupByCountryBucketIterator.next();
		
		System.out.println(groupByCountryBucket.getKey() + "\t" + groupByCountryBucket.getDocCount()); 
		
		Histogram groupByJoinDate = (Histogram) groupByCountryBucket.getAggregations().asMap().get("group_by_join_date"); 
		Iterator<org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket> groupByJoinDateBucketIterator = groupByJoinDate.getBuckets().iterator();
		 
		while(groupByJoinDateBucketIterator.hasNext()) {
			org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket groupByJoinDateBucket = groupByJoinDateBucketIterator.next();
			
			System.out.println(groupByJoinDateBucket.getKey() + "\t" + groupByJoinDateBucket.getDocCount()); 
			
			Avg avgSalary = (Avg) groupByJoinDateBucket.getAggregations().asMap().get("avg_salary");
			System.out.println(avgSalary.getValue()); 
		}
	}
	
	client.close();
}
复制代码

upsert请求

private static void upsert(TransportClient transport) {
	try {
		IndexRequest index = new IndexRequest("book_shop", "books", "2").source(
				XContentFactory.jsonBuilder().startObject()
						.field("name", "mysql从入门到删库跑路")
						.field("tags", "mysql")
						.field("price", 32.8)
						.endObject());

		UpdateRequest update = new UpdateRequest("book_shop", "books", "2")
				.doc(XContentFactory.jsonBuilder()
						.startObject().field("price", 31.8)
						.endObject())
				.upsert(index);
		UpdateResponse response = transport.update(update).get();
		System.out.println(response.getVersion());
	} catch (IOException e) {
		e.printStackTrace();
	} catch (InterruptedException e) {
		e.printStackTrace();
	} catch (ExecutionException e) {
		e.printStackTrace();
	}
}
复制代码

mget请求

public static void mget(TransportClient transport) {
	MultiGetResponse res = transport.prepareMultiGet()
			.add("book_shop", "books", "1")
			.add("book_shop", "books", "2")
			.get();
	for (MultiGetItemResponse item : res.getResponses()) {
		System.out.println(item.getResponse());
	}
}
复制代码

bulk请求

public static void bulk(TransportClient transport) {
	try {
	BulkRequestBuilder bulk = transport.prepareBulk();
	bulk.add(transport.prepareIndex("book_shop", "books", "3").setSource(
			XContentFactory.jsonBuilder().startObject()
					.field("name", "设计模式从入门到拷贝代码")
					.field("tags", "设计模式")
					.field("price", 55.9)
					.endObject()));
		bulk.add(transport.prepareIndex("book_shop", "books", "4").setSource(
				XContentFactory.jsonBuilder().startObject()
						.field("name", "架构设计从入门到google搜索")
						.field("tags", "架构设计")
						.field("price", 68.9)
						.endObject()));
		bulk.add(transport.prepareUpdate("book_shop", "books", "1").setDoc((XContentFactory.jsonBuilder()
				.startObject().field("price", 32.8)
				.endObject())));

		BulkResponse bulkRes = bulk.get();
		if (bulkRes.hasFailures()) {
			System.out.println("Error...");
		}
	} catch (IOException e) {
		e.printStackTrace();
	}
}
复制代码

scorll请求

public static void scorll(TransportClient client) {
	SearchResponse bookShop = client.prepareSearch("book_shop").setScroll(new TimeValue(60000)).setSize(1).get();

	int batchCnt = 0;
	do {
	    // 循环读取scrollid信息,直到结果为空
		for


    
(SearchHit hit: bookShop.getHits().getHits()) {
			System.out.println("batchCnt:" + ++batchCnt);
			System.out.println(hit.getSourceAsString());
		}
		bookShop = client.prepareSearchScroll(bookShop.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
	} while (bookShop.getHits().getHits().length != 0);
}

复制代码

搜索模板

public static void searchTemplates(TransportClient client) {
	Map<String,Object> params = new HashMap<>(10);
	params.put("from",0);
	params.put("size",10);
	params.put("tags","java");

	SearchTemplateResponse str = new SearchTemplateRequestBuilder(client)
			.setScript("page_query_by_tags")
			.setScriptType(ScriptType.STORED)
			.setScriptParams(params)
			.setRequest(new SearchRequest())
			.get();

	for(SearchHit hit:str.getResponse().getHits().getHits()) {
		System.out.println(hit.getSourceAsString());
	}
}
复制代码

多条件组合查询

public static void otherSearch(TransportClient client) {
	SearchResponse response1 = client.prepareSearch("book_shop").setQuery(QueryBuilders.termQuery("tags", "java")).get();
	SearchResponse response2 = client.prepareSearch("book_shop").setQuery(QueryBuilders.multiMatchQuery("32.8", "price","tags")).get();
	SearchResponse response3 = client.prepareSearch("book_shop").setQuery(QueryBuilders.commonTermsQuery("name", "入门")).get();
	SearchResponse response4 = client.prepareSearch("book_shop").setQuery(QueryBuilders.prefixQuery("name", "java")).get();

	System.out.println(response1.getHits().getHits()[0].getSourceAsString());
	System.out.println(response2.getHits().getHits()[0].getSourceAsString());
	System.out.println(response3.getHits().getHits()[0].getSourceAsString());
	System.out.println(response4.getHits().getHits()[0].getSourceAsString());

	// 多个条件组合
	SearchResponse response5 = client.prepareSearch("book_shop").setQuery(QueryBuilders.boolQuery()
			.must(QueryBuilders.termQuery("tags", "java"))
			.mustNot(QueryBuilders.matchQuery("name", "跑路"))
			.should(QueryBuilders.matchQuery("name", "入门"))
			.filter(QueryBuilders.rangeQuery("price").gte(23).lte(55))).get();

	System.out.println(response5.getHits().getHits()[0].getSourceAsString());
}
复制代码

地理位置查询

public static void geo(TransportClient client) {
	GeoBoundingBoxQueryBuilder query1 = QueryBuilders.geoBoundingBoxQuery("location").setCorners(23, 112, 21, 114);

	List<GeoPoint> points = new ArrayList<>();
	points.add(new GeoPoint(23,115));
	points.add(new GeoPoint(25,113));
	points.add(new GeoPoint(21,112));
	GeoPolygonQueryBuilder query2 = QueryBuilders.geoPolygonQuery("location",points);

	GeoDistanceQueryBuilder query3 = QueryBuilders.geoDistanceQuery("location").point(22.523375, 113.911231).distance(500, DistanceUnit.METERS);


	SearchResponse response = client.prepareSearch("location").setQuery(query3).get();
	for(SearchHit hit:response.getHits().getHits()) {
		System.out.println(hit.getSourceAsString());
	}
}
复制代码

小结

上述的那些案例demo,快速浏览一下即可,如果已经在开发ES相关的项目,还是多参考官方的API文档:www.elastic.co/guide/en/el…

专注Java高并发、分布式架构,更多技术干货分享与心得,请关注公众号:Java架构社区 可以扫左边二维码添加好友,邀请你加入Java架构社区微信群共同探讨技术

Java架构社区

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/64050
 
572 次点击