好客租房-12.ES接入java
1. java语言操作ES
在Elasticsearch中,为java提供了2种客户端,一种是REST风格的客户端,另一种是Java API的客户端。
1.1 Rest风格Java客户端
Elasticsearch提供了2种REST客户端,一种是低级客户端,一种是高级客户端。
Java Low Level REST Client:官方提供的低级客户端。该客户端通过http来连接Elasticsearch集群。用户在使用该客户端时需要将请求数据手动拼接成Elasticsearch所需JSON格式进行发送,收到响应时同样也需要将返回的JSON数据手动封装成对象。虽然麻烦,不过该客户端兼容所有的Elasticsearch版本。
Java High Level REST Client:官方提供的高级客户端。该客户端基于低级客户端实现,它提供了很多便捷的API来解决低级客户端需要手动转换数据格式的问题。
1.1.1 构造数据
POST http://ip:9200/haoke/house/_bulk
{"index":{"_index":"haoke","_type":"house"}}
{"id":"1001","title":"整租 · 南丹大楼 1居室 7500","price":"7500"}
{"index":{"_index":"haoke","_type":"house"}}
{"id":"1002","title":"陆家嘴板块,精装设计一室一厅,可拎包入住诚意租。","price":"8500"}
{"index":{"_index":"haoke","_type":"house"}}
{"id":"1003","title":"整租 · 健安坊 1居室 4050","price":"7500"}
{"index":{"_index":"haoke","_type":"house"}}
{"id":"1004","title":"整租 · 中凯城市之光+视野开阔+景色秀丽+拎包入住","price":"6500"}
{"index":{"_index":"haoke","_type":"house"}}
{"id":"1005","title":"整租 · 南京西路品质小区 21213三轨交汇 配套齐* 拎包入住","price":"6000"}
{"index":{"_index":"haoke","_type":"house"}}
{"id":"1006","title":"祥康里 简约风格 *南户型 拎包入住 看房随时","price":"7000"}
1.1.2 创建工程
创建工程itcast-elasticsearch
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>itcast-elasticsearch</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!---- 低级别jar ---->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>6.5.4</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.4</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
低级别单测代码如下:
package cn.itcast.es.rest;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.HttpHost;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* 低级别rest风格测试
*/
public class TestESREST {
// object 转 json 的工具
private static final ObjectMapper MAPPER = new ObjectMapper();
// es的rest客户端类
private RestClient restClient;
/**
* 连接ES客户端
*/
@Before
public void init() {
// 这里支持分布式ES, 你可以直接用','分割.
RestClientBuilder restClientBuilder = RestClient.builder(
new HttpHost("你的ES的ip", 9200, "http"));
restClientBuilder.setFailureListener(new RestClient.FailureListener() {
@Override
public void onFailure(Node node) {
System.out.println("出错了 -> " + node);
}
});
this.restClient = restClientBuilder.build();
}
@After
public void after() throws IOException {
restClient.close();
}
// 查询集群状态
@Test
public void testGetInfo() throws IOException {
// 低级别的更接近http请求, 发送对象都是Request对象, 响应对象都是Response对象
Request request = new Request("GET", "/_cluster/state");
request.addParameter("pretty", "true");
Response response = this.restClient.performRequest(request);
System.out.println(response.getStatusLine());
System.out.println(EntityUtils.toString(response.getEntity()));
}
// 新增数据
@Test
public void testCreateData() throws IOException {
Request request = new Request("POST", "/haoke/house");
// 请求携带参数
Map<String, Object> data = new HashMap<>();
data.put("id", "2001");
data.put("title", "张江高科");
data.put("price", "3500");
// map 转为 json 格式的 String
request.setJsonEntity(MAPPER.writeValueAsString(data));
Response response = this.restClient.performRequest(request);
System.out.println(response.getStatusLine());
System.out.println(EntityUtils.toString(response.getEntity()));
}
// 根据id查询数据
@Test
public void testQueryData() throws IOException {
Request request = new Request("GET", "/haoke/house/RKF3e4QBtVAmf7zPShsz");
Response response = this.restClient.performRequest(request);
System.out.println(response.getStatusLine());
System.out.println(EntityUtils.toString(response.getEntity()));
}
// 搜索数据
@Test
public void testSearchData() throws IOException {
Request request = new Request("POST", "/haoke/house/_search");
String searchJson = "{\"query\": {\"match\": {\"title\": \"拎包入住\"}}}";
request.setJsonEntity(searchJson);
request.addParameter("pretty", "true");
Response response = this.restClient.performRequest(request);
System.out.println(response.getStatusLine());
System.out.println(EntityUtils.toString(response.getEntity()));
}
}
1.1.3 高级别Rest客户端
先引入高级别依赖
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.5.4</version>
</dependency>
<!-- 其实引入这个就不需要引入低级别的依赖了, 因为高级别已经依赖了对应的低级别-->
测试代码如下:
package cn.itcast.es.rest;
import org.apache.http.HttpHost;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class TestRestHighLevel {
// 这里的客户端变为了高级别rest客户端
private RestHighLevelClient client;
@Before
public void init() {
RestClientBuilder restClientBuilder = RestClient.builder(
new HttpHost("134.175.110.184", 9200, "http"));
this.client = new RestHighLevelClient(restClientBuilder);
}
@After
public void after() throws Exception {
this.client.close();
}
/**
* 新增文档,同步操作
*
* @throws Exception
*/
@Test
public void testCreate() throws Exception {
Map<String, Object> data = new HashMap<>();
data.put("id", "2002");
data.put("title", "南京西路 拎包入住 一室一厅");
data.put("price", "4500");
// 发送创建索引请求需要使用IndexRequest
IndexRequest indexRequest = new IndexRequest("haoke", "house")
.source(data);
// 响应也是同理
IndexResponse indexResponse = this.client.index(indexRequest,
RequestOptions.DEFAULT);
System.out.println("id->" + indexResponse.getId());
System.out.println("index->" + indexResponse.getIndex());
System.out.println("type->" + indexResponse.getType());
System.out.println("version->" + indexResponse.getVersion());
System.out.println("result->" + indexResponse.getResult());
System.out.println("shardInfo->" + indexResponse.getShardInfo());
}
/**
* 新增文档,异步操作
*/
@Test
public void testCreateAsync() throws Exception {
Map<String, Object> data = new HashMap<>();
data.put("id", "2003");
data.put("title", "南京东路 最新房源 二室一厅");
data.put("price", "5500");
IndexRequest indexRequest = new IndexRequest("haoke", "house")
.source(data);
this.client.indexAsync(indexRequest, RequestOptions.DEFAULT, new
ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
System.out.println("id->" + indexResponse.getId());
System.out.println("index->" + indexResponse.getIndex());
System.out.println("type->" + indexResponse.getType());
System.out.println("version->" + indexResponse.getVersion());
System.out.println("result->" + indexResponse.getResult());
System.out.println("shardInfo->" + indexResponse.getShardInfo());
}
@Override
public void onFailure(Exception e) {
System.out.println(e);
}
});
Thread.sleep(20000);
}
@Test
public void testQuery() throws Exception {
GetRequest getRequest = new GetRequest("haoke", "house",
"RKF3e4QBtVAmf7zPShsz");
// 指定返回的字段
String[] includes = new String[]{"title", "id"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext =
new FetchSourceContext(true, includes, excludes);
getRequest.fetchSourceContext(fetchSourceContext);
GetResponse response = this.client.get(getRequest,
RequestOptions.DEFAULT);
System.out.println("数据 -> " + response.getSource());
}
/**
* 判断是否存在
*
* @throws Exception
*/
@Test
public void testExists() throws Exception {
GetRequest getRequest = new GetRequest("haoke", "house",
"GkpdE2gBCKv8opxuOj12");
// 不返回的字段
getRequest.fetchSourceContext(new FetchSourceContext(false));
boolean exists = this.client.exists(getRequest, RequestOptions.DEFAULT);
System.out.println("exists -> " + exists);
}
/**
* 删除数据
*
* @throws Exception
*/
@Test
public void testDelete() throws Exception {
DeleteRequest deleteRequest = new DeleteRequest("haoke", "house",
"Q6F3e4QBtVAmf7zPShsz");
DeleteResponse response = this.client.delete(deleteRequest,
RequestOptions.DEFAULT);
System.out.println(response.status());// OK or NOT_FOUND
}
/**
* 更新数据
*
* @throws Exception
*/
@Test
public void testUpdate() throws Exception {
UpdateRequest updateRequest = new UpdateRequest("haoke", "house",
"SaF5e4QBtVAmf7zPpBvS");
Map<String, Object> data = new HashMap<>();
data.put("title", "张江高科2");
data.put("price", "5000");
updateRequest.doc(data);
UpdateResponse response = this.client.update(updateRequest,
RequestOptions.DEFAULT);
System.out.println("version -> " + response.getVersion());
}
/**
* 测试搜索
*
* @throws Exception
*/
@Test
public void testSearch() throws Exception {
SearchRequest searchRequest = new SearchRequest("haoke");
searchRequest.types("house");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchQuery("title", "拎包入住"));
sourceBuilder.from(0);
sourceBuilder.size(5);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
searchRequest.source(sourceBuilder);
SearchResponse search = this.client.search(searchRequest,
RequestOptions.DEFAULT);
System.out.println("搜索到 " + search.getHits().totalHits + " 条数据.");
SearchHits hits = search.getHits();
for (SearchHit hit : hits) {
System.out.println(hit.getSourceAsString());
}
}
}
1.2 Spring Data 接入Elasticsearch
Spring Data项目对Elasticsearch做了支持,其目的就是简化对Elasticsearch的操作
application.properties
spring.application.name = itcast-elasticsearch
spring.data.elasticsearch.cluster-name=es-itcast-cluster
# 9200是RESTful端口,9300是API端口。
spring.data.elasticsearch.clusternodes=ip:9300
启动类:
package cn.itcast.es.rest;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
}
User对象:
package cn.itcast.es.pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Document(indexName = "itcast", type = "user", shards = 6, replicas = 1)
public class User {
@Id
private Long id;
@Field(store = true)
private String name;
@Field
private Integer age;
@Field
private String hobby;
}
单测代码:
package cn.itcast.es.rest;
import cn.itcast.es.pojo.User;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.aggregation.AggregatedPage;
import org.springframework.data.elasticsearch.core.query.*;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.ArrayList;
import java.util.List;
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestSpringBootES {
@Autowired
private ElasticsearchTemplate elasticsearchTemplate;
@Test
public void testSave() {
User user = new User();
user.setId(1001L);
user.setAge(20);
user.setName("张三");
user.setHobby("足球、篮球、听音乐");
IndexQuery indexQuery = new IndexQueryBuilder().withObject(user).build();
String index = this.elasticsearchTemplate.index(indexQuery);
System.out.println(index);
}
@Test
public void testBulk() {
List<IndexQuery> list = new ArrayList<>();
for (int i = 0; i < 5000; i++) {
User user = new User();
user.setId(1001L + i);
user.setAge(i % 50 + 10);
user.setName("张三" + i);
user.setHobby("足球、篮球、听音乐");
IndexQuery indexQuery = new
IndexQueryBuilder().withObject(user).build();
list.add(indexQuery);
}
long start = System.currentTimeMillis();
this.elasticsearchTemplate.bulkIndex(list);
System.out.println("用时:" + (System.currentTimeMillis() - start));
//用时:7836
}
/**
* 局部更新,全部更新使用index覆盖即可
*/
@Test
public void testUpdate() {
IndexRequest indexRequest = new IndexRequest();
indexRequest.source("age", "30");
UpdateQuery updateQuery = new UpdateQueryBuilder()
.withId("1001")
.withClass(User.class)
.withIndexRequest(indexRequest).build();
this.elasticsearchTemplate.update(updateQuery);
}
@Test
public void testDelete(){
this.elasticsearchTemplate.delete(User.class, "1001");
}
@Test
public void testSearch(){
PageRequest pageRequest = PageRequest.of(1,10); //设置分页参数
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.matchQuery("name", "张三")) // match查询
.withPageable(pageRequest)
.build();
AggregatedPage<User> users =
this.elasticsearchTemplate.queryForPage(searchQuery, User.class);
System.out.println("总页数:" + users.getTotalPages()); //获取总页数
for (User user : users.getContent()) { // 获取搜索到的数据
System.out.println(user);
}
}
}