1、 简介REST Client
SpringBoot整合ES的方式(TransportClient、Data-ES、Elasticsearch SQL、REST Client)。1.1 TransportClient即将弃用。
1.2 Spring提供的封装的方式,好像底层也是基于TransportClient,Elasticsearch7.0后的版本不怎么支持。
1.3 将Elasticsearch的
Query DSL
用SQL
转换查询,早期有一个第三方的插件Elasticsearch-SQL,后来随着官方也开始做这方面,这个插件好像就没怎么更新了。
1.4 官方推荐使用,所以我们采用这个方式,这个分为两个Low Level REST Client和High Level REST Client,Low Level REST Client是早期出的API比较简陋了,还需要自己去拼写Query DSL
,High Level REST Client使用起来更好用,更符合面向对象的感觉,所有选择使用High Level REST Client。
2、 引用依赖
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.5.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.elasticsearch.client/elasticsearch-rest-high-level-client -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.5.1</version>
</dependency>
<!-- lettuce pool 缓存连接池 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.5.0</version>
</dependency>
3、 添加配置
# Elasticsearch配置
es:
host: 127.0.0.1
port: 9200
4、添加配置类
ESConf.java
package com.example.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class ESConf {
@Value("${es.host}")
private String host;
@Value("${es.port}")
private int port;
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
}
5、对象池配置类
MyEsClientPool.java
package com.example.base;
import com.example.config.ESConf;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author: SongBin
* @date: 2020/4/28 9:12
* @description:
* @version: 1.0
*/
@Component
public class MyEsClientPool {
//private static final String HOST = "192.168.20.138"; // 集群节点
//private static final int PORT = 9200;
// 对象池配置类,不写也可以,采用默认配置
private GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
// 采用默认配置maxTotal是8,池中有8个client
public MyEsClientPool() {
poolConfig.setMaxTotal(20);
poolConfig.setMaxIdle(5);
poolConfig.setTestWhileIdle(true);
poolConfig.setTestOnBorrow(false);
poolConfig.setTimeBetweenEvictionRunsMillis(300000L);
poolConfig.setMinIdle(5);
}
@Autowired
public ESConf esConf;
private GenericObjectPool<RestHighLevelClient> clientPool = new GenericObjectPool<RestHighLevelClient>(new PooledObjectFactory<RestHighLevelClient>() {
public PooledObject<RestHighLevelClient> makeObject() throws Exception {
RestHighLevelClient client = null;
try {
client = new RestHighLevelClient(RestClient.builder(new HttpHost(esConf.getHost(), esConf.getPort(), "http")));
} catch (Exception e) {
e.printStackTrace();
}
return new DefaultPooledObject<RestHighLevelClient>(client);
}
public void destroyObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {
RestHighLevelClient client = pooledObject.getObject();
client.close();
}
public boolean validateObject(PooledObject<RestHighLevelClient> pooledObject) {
return true;
}
public void activateObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {
System.out.println("激活客户端");
}
public void passivateObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {
System.out.println("释放客户端");
}
}, poolConfig);
/**
* 获得对象
*
* @return
* @throws Exception
*/
public RestHighLevelClient getClient() throws Exception {
RestHighLevelClient client = clientPool.borrowObject();
return client;
}
/**
* 归还对象
*
* @param client
*/
public void returnClient(RestHighLevelClient client) {
if (client != null) {
clientPool.returnObject(client);
}
}
/*public static void main(String[] args) throws Exception {
RestHighLevelClient client = MyEsClientPool.getClient();
System.out.println(client);
}*/
}
6、ES操作工具类
MyEsUtils.java
package com.example.base;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @author: SongBin
* @date: 2020/4/27 10:11
* @description:
* @version: 1.0
*/
@Component
public class MyEsUtils {
private static RestHighLevelClient client = null;
@Autowired
private MyEsClientPool MyEsClientPool;
/**
* 获取客户端
*
* @return
*/
public RestHighLevelClient getRestHighLevelClient() {
if (client == null) {
try {
client = MyEsClientPool.getClient();
} catch (Exception e) {
e.printStackTrace();
}
}
return client;
}
/**
* 关闭客户端
*/
public void closeClient(){
if (client!=null){
MyEsClientPool.returnClient(client);
}
}
/**
* 构建查询对象
* @param filedsMap 查询条件 (key:查询字段 ,vlues:值)
* @return
*/
public BoolQueryBuilder getQueryBuilder(Map<String,String> filedsMap){
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
Set<String> strings = filedsMap.keySet();
for (String string : strings) {
boolQueryBuilder.must(QueryBuilders.wildcardQuery(string,"*"+filedsMap.get(string)+"*"));
}
return boolQueryBuilder;
}
/**
* 获取分页后的结果集
* @param queryBuilder 查询对象
* @param esIndex 索引名
* @param pageNo 页数
* @param pagesize 页大小
* @param glFields 需要高亮显示的字段
* @return
*/
public List<Map<String,Object>> getPageResultList(QueryBuilder queryBuilder, String esIndex, int pageNo, int pagesize,List<String> glFields){
SearchRequest searchRequest = new SearchRequest(esIndex);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//设置高亮显示
HighlightBuilder highlightBuilder = new HighlightBuilder().field("*").requireFieldMatch(false);
highlightBuilder.preTags("<span style=\"color:red\">");
highlightBuilder.postTags("</span>");
searchSourceBuilder.highlighter(highlightBuilder);
if (pageNo>=1) {
searchSourceBuilder.query(queryBuilder).from((pageNo - 1) * pagesize).size(pagesize);
}else {
searchSourceBuilder.query(queryBuilder).from(0).size(pagesize);
}
searchRequest.source(searchSourceBuilder);
client = getRestHighLevelClient();
SearchResponse searchResponse = null;
try {
searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
// 从response中获得结果
List<Map<String,Object>> list = new LinkedList();
searchResponse.getHits();
SearchHits hits = searchResponse.getHits();
Iterator<SearchHit> iterator = hits.iterator();
while (iterator.hasNext()) {
SearchHit next = iterator.next();
Map<String, Object> source = next.getSourceAsMap();
//处理高亮片段
Map<String, HighlightField> highlightFields = next.getHighlightFields();
for(String fieldName : glFields){
HighlightField nameField = highlightFields.get(fieldName);
if(nameField!=null){
Text[] fragments = nameField.fragments();
StringBuilder nameTmp = new StringBuilder();
for(Text text:fragments){
nameTmp.append(text);
}
//将高亮片段组装到结果中去
source.put(fieldName, nameTmp.toString());
}
}
list.add(source);
}
return list;
}
/**
* 全文检索
* @param query
* @return
*/
public Map<String,Object> search(SearchRequestQuery query) throws Exception {
//获取客户端
client = getRestHighLevelClient();
Map<String,Object> result = new HashMap<>();
List<Map<String,Object>> list = new ArrayList<>();
// 1、创建查询索引
SearchRequest searchRequest = new SearchRequest(query.getEsIndex());
// 2、条件查询
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
//3.构建分页
int pageNo = 1,pageSize =10;
if(query.getPageNo() != null){
pageNo = query.getPageNo();
}
if(query.getPageSize() != null){
pageSize = query.getPageSize();
}
//3.1 es默认从第0页开始
sourceBuilder.from((pageNo - 1) * pageSize);
sourceBuilder.size(pageSize);
//4.构建基础查询(包含基础查询和过滤条件)【过滤关系,key为(and或者or或者not),value为过滤字段和值】
QueryBuilder queryBuilder =buildBasicQueryWithFilter(query);
sourceBuilder.query(queryBuilder);
//4.2 设置最长等待时间1分钟
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
// 5、高亮设置(替换返回结果文本中目标值的文本内容)
HighlightBuilder highlightBuilder = new HighlightBuilder();
for (int i = 0; i < query.getKeywordFields().length; i++) {
highlightBuilder.field(query.getKeywordFields()[i]);
}
//5.1允许同一个检索词多次高亮,false则表示,同意字段中同一个检索词第一个位置的高亮,其他不高亮
highlightBuilder.requireFieldMatch(true);
highlightBuilder.preTags("<span style='color:red'>");
highlightBuilder.postTags("</span>");
sourceBuilder.highlighter(highlightBuilder);
//6.构建排序
String sortBy = query.getSortBy();
Boolean desc = query.getIsDesc();
if (StringUtils.isNotBlank(sortBy)) {
sourceBuilder.sort(new FieldSortBuilder(sortBy).order(desc ? SortOrder.DESC : SortOrder.ASC));
}
//7.聚合(分组)
Map<String, String> aggs = query.getAggMap();
if(aggs != null){
for (Map.Entry<String, String> entry : aggs.entrySet()) {
//聚合名称(分组)
String aggName = entry.getKey();
//聚合字段
String aggFiled = entry.getValue();
if(aggName != null || aggFiled != null) {
sourceBuilder.aggregation(AggregationBuilders.terms(aggName).field(aggFiled+".keyword"));
}
}
}
//8、通过sourceFilter设置返回的结果字段,第一个参数是显示的字段,第二个参数是不显示的字段,默认设置为null
sourceBuilder.fetchSource(query.getSourceFilter(),null);
//9、执行搜索
searchRequest.source(sourceBuilder);
try {
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
for (SearchHit doc : searchResponse.getHits().getHits()) {
// 解析高亮字段
Map<String, HighlightField> highlightFields = doc.getHighlightFields();
for (int i = 0; i < query.getKeywordFields().length; i++) {
HighlightField fieldTitle = highlightFields.get(query.getKeywordFields()[i]);
// 获取原来的结果集
Map<String, Object> sourceAsMap = doc.getSourceAsMap();
if (fieldTitle != null) {
// 获取内容中匹配的片段
Text[] fragments = fieldTitle.fragments();
// 设置当前的目标字段为空
String new_fieldTitle = "";
for (Text res : fragments) {
new_fieldTitle += res;
}
// 将原来的结果替换为新结果
sourceAsMap.put(query.getKeywordFields()[i], new_fieldTitle);
}
list.add(sourceAsMap);
}
}
// List 数组去重, 多字段查询高亮解析的时候存在数组重复的情况(优化方法未知!)
list = list.stream().distinct().collect(Collectors.toList());
int total = (int) searchResponse.getHits().getTotalHits().value;
result.put("data",list);
result.put("total",total);
result.put("totalPage",total== 0 ? 0: (total%pageSize == 0 ? total / pageSize : (total / pageSize) + 1));
result.put("pageSize",pageSize);
result.put("pageNo",pageNo);
//聚和结果处理
Aggregations aggregations = searchResponse.getAggregations();
List<Object> aggData = new ArrayList<>();
if(aggregations != null){
aggData = getAggData(aggregations,query);
}
result.put("aggData",aggData);
} catch (IOException e) {
// log.error(e);
}finally {
// closeClient(client);
}
return result;
}
/**
* 聚合数据处理(分组)
* @param aggregations
* @param query
* @return
*/
private static List<Object> getAggData( Aggregations aggregations ,SearchRequestQuery query) {
List<Object> result = new ArrayList<>();
for (Map.Entry<String, String> entry : query.getAggMap().entrySet()) {
LinkedHashMap<String,Object> map = new LinkedHashMap<>();
//聚合名称(分组)
String aggName = entry.getKey();
//聚合字段
String aggFiled = entry.getValue();
if(aggName != null) {
LinkedHashMap<String,Object> groupItem=new LinkedHashMap<>();
Terms aggregation = aggregations.get(aggName);
for (Terms.Bucket bucket : aggregation.getBuckets()) {
map.put(bucket.getKey().toString(),bucket.getDocCount());
}
groupItem.put("aggregationName",aggName);
groupItem.put("aggregationField",aggFiled);
groupItem.put("aggregationData",map);
result.add(groupItem);
}
}
return result;
}
private QueryBuilder buildBasicQueryWithFilter( SearchRequestQuery query ) {
String flag = "";
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
BoolQueryBuilder shouldQuery = QueryBuilders.boolQuery();
//过滤条件(and,or,not关系)
Map<String, Map<String,String>> filter = query.getFilter();
if(filter != null) {
for (Map.Entry<String, Map<String,String>> entry : filter.entrySet()) {
String key = entry.getKey();
flag = key;
Map<String, String> value =entry.getValue();
for (Map.Entry<String, String> map : value.entrySet()) {
String filterKey = map.getKey();
String filterValue = map.getValue();
if(key == "and") {
queryBuilder.filter(QueryBuilders.termQuery(filterKey, filterValue));
}
if(key == "or") {
shouldQuery.should(QueryBuilders.termQuery(filterKey, filterValue));
}
if(key == "not") {
queryBuilder.mustNot(QueryBuilders.termQuery(filterKey, filterValue));
}
}
}
}
//过滤日期期间的值,比如2019-07-01到2019-07-17
if(StringUtils.isNotBlank(query.getDateField()) || StringUtils.isNotBlank(query.getStartDate()) || StringUtils.isNotBlank(query.getEndDate())) {
queryBuilder.must(QueryBuilders.rangeQuery(query.getDateField()).from(query.getStartDate()).to(query.getEndDate()));
}
//如果输入的查询条件为空,则查询所有数据
if(query.getKeyword() == null || "".equals(query.getKeyword())) {
queryBuilder.must(QueryBuilders.matchAllQuery());
return queryBuilder;
}
if(flag == "or") {
//配置中文分词器并指定并分词的搜索方式operator
queryBuilder.must(QueryBuilders.multiMatchQuery(query.getKeyword(), query.getKeywordFields()))
//解决should和must共用不生效问题
.must(shouldQuery);
}else {
//多字段查询,字段直接是or的关系
queryBuilder.must(QueryBuilders.multiMatchQuery(query.getKeyword(),query.getKeywordFields()));
/*queryBuilder.must(QueryBuilders.multiMatchQuery(query.getKeyword(),query.getKeywordFields())
.analyzer("ik_smart").operator(Operator.OR));*/
}
return queryBuilder;
}
public List<LinkedHashMap<String,Object>> getPageResultListLinked(QueryBuilder queryBuilder, String esIndex, int pageNo, int pagesize, SortBuilder sortBuilder, String[] includes, String[] excludes){
SearchRequest searchRequest = new SearchRequest(esIndex);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//searchSourceBuilder.query(queryBuilder).from((pageNo - 1) * pagesize).size(pagesize).sort(sortBuilder).fetchSource(includes,excludes);
if(sortBuilder != null) {
searchSourceBuilder.query(queryBuilder).from((pageNo - 1) * pagesize).size(pagesize).sort(sortBuilder);
} else {
searchSourceBuilder.query(queryBuilder).from((pageNo - 1) * pagesize).size(pagesize);
}
if(includes != null && includes.length > 0){
searchSourceBuilder.fetchSource(includes,excludes);
}
searchRequest.source(searchSourceBuilder);
client = getRestHighLevelClient();
SearchResponse searchResponse = null;
try {
searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
// 从response中获得结果
List<LinkedHashMap<String,Object>> list = new LinkedList();
searchResponse.getHits();
SearchHits hits = searchResponse.getHits();
Iterator<SearchHit> iterator = hits.iterator();
while (iterator.hasNext()) {
SearchHit next = iterator.next();
list.add(getMapValueForLinkedHashMap(next.getSourceAsMap()));
}
return list;
}
public static LinkedHashMap getMapValueForLinkedHashMap(Map dataMap) {
LinkedHashMap returnMap = new LinkedHashMap();
Iterator iterator = dataMap.keySet().iterator();
while (iterator.hasNext()) {
Object objKey = iterator.next();
Object objValue = dataMap.get(objKey);
if (objValue instanceof Map) {
returnMap.put(objKey, getMapValueForLinkedHashMap((Map) objValue));
} else {
returnMap.put(toLowerCaseFirstOne(objKey.toString()), objValue);
}
}
return returnMap;
}
private static String toLowerCaseFirstOne(String s) {
if (Character.isLowerCase(s.charAt(0)))
return s;
else
return (new StringBuilder()).append(Character.toLowerCase(s.charAt(0))).append(s.substring(1)).toString();
}
/**
* 获取结果总数
* @param queryBuilder
* @param esIndex
* @return
*/
public Long getResultCount(QueryBuilder queryBuilder, String esIndex){
CountRequest countRequest=new CountRequest(esIndex);
countRequest.query(queryBuilder);
try {
CountResponse response=getRestHighLevelClient().count(countRequest,RequestOptions.DEFAULT);
long length = response.getCount();
return length;
} catch (IOException e) {
e.printStackTrace();
}
return 0L;
}
/**
* 获取文档总数
* @param index
* @return
*/
public long getDocCount(String index){
CountRequest countRequest = new CountRequest();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
countRequest.source(searchSourceBuilder);
CountResponse countResponse = null;
client=getRestHighLevelClient();
try {
countResponse = client
.count(countRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
return countResponse.getCount();
}
/**
* 判断索引是否存在
* @param esIndex
* @return
*/
public boolean isIndexExist(String esIndex){
boolean isExists = true;
GetIndexRequest request = new GetIndexRequest(esIndex);
try {
isExists = getRestHighLevelClient().indices().exists(request, RequestOptions.DEFAULT);
if (isExists){
System.out.println(String.format("索引%s已存在",esIndex));
}else{
System.out.println(String.format("索引%s不存在",esIndex));
}
} catch (IOException e) {
e.printStackTrace();
}
return isExists;
}
/**
* 新建索引
* @param esIndex
* @param shards 分片数
* @param replications 副本数
* @param fileds 字段名->类型
*/
public void createIndex(String esIndex,int shards,int replications,Map<String,String> fileds){
if (!isIndexExist(esIndex)){
try {
XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.field("properties")
.startObject();
for (String s : fileds.keySet()) {
builder.field(s).startObject().field("index","true").field("type",fileds.get(s)).endObject();
}
builder.endObject().endObject();
CreateIndexRequest request = new CreateIndexRequest(esIndex);
request.settings(Settings.builder()
.put("index.number_of_shards", shards)
.put("index.number_of_replicas", replications)
).mapping(builder);
CreateIndexResponse createIndexResponse = getRestHighLevelClient().indices().create(request, RequestOptions.DEFAULT);
boolean acknowledged = createIndexResponse.isAcknowledged();
if (acknowledged){
System.out.println(String.format("索引%s创建成功",esIndex));
}else{
System.out.println(String.format("索引%s创建失败",esIndex));
}
} catch (IOException e) {
e.printStackTrace();
}
}else{
System.out.println(String.format("索引%s已存在",esIndex));
}
}
/**
* 删除索引
* @param esIndex
*/
public void deleteIndex(String esIndex){
DeleteIndexRequest request = new DeleteIndexRequest(esIndex);
try {
AcknowledgedResponse deleteIndexResponse = getRestHighLevelClient().indices().delete(request, RequestOptions.DEFAULT);
boolean acknowledged = deleteIndexResponse.isAcknowledged();
if (acknowledged){
System.out.println(String.format("索引%s已删除",esIndex));
}else{
System.out.println(String.format("索引%s删除失败",esIndex));
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 根据id获取数据,返回map(字段名,字段值)
* @param esIndex
* @param id
* @return
*/
public Map<String,Object> getDataById(String esIndex,String id){
GetRequest request = new GetRequest(esIndex, id);
GetResponse response = null;
Map<String,Object> source = null;
try {
response = getRestHighLevelClient().get(request,RequestOptions.DEFAULT);
if (response.isExists()){
source = response.getSource();
}
} catch (IOException e) {
e.printStackTrace();
}
return source;
}
/**
* 更新文档
* @param esIndex
* @param id
* @param updateFileds 更新的字段名->字段值
*/
public void updateDataById(String esIndex,String id,Map<String,Object> updateFileds){
UpdateRequest request = new UpdateRequest(esIndex, id).doc(updateFileds);
try {
UpdateResponse response = getRestHighLevelClient().update(request, RequestOptions.DEFAULT);
if (response.status()== RestStatus.OK){
System.out.println(String.format("更新索引为%s,id为%s的文档成功",response.getIndex(),response.getId()));
}else{
System.out.println(String.format("更新索引为%s,id为%s的文档失败",response.getIndex(),response.getId()));
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 删除指定id的文档
* @param esIndex
* @param id
*/
public void deleteDataById(String esIndex,String id){
DeleteRequest request = new DeleteRequest(esIndex,id);
try {
DeleteResponse response = getRestHighLevelClient().delete(request, RequestOptions.DEFAULT);
if (response.getResult()== DocWriteResponse.Result.DELETED){
System.out.println(String.format("id为%s的文档删除成功",id));
}else{
System.out.println(String.format("id为%s的文档删除失败",id));
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 批量插入
* @param esIndex
* @param datalist 数据集,数据格式为map<字段名,字段值>
*/
public void bulkLoad(String esIndex,List<Map<String,Object>> datalist){
BulkRequest bulkRequest = new BulkRequest();
for (Map<String,Object> data : datalist) {
Object id = data.get("id");
//如果数据包含id字段,使用数据id作为文档id
if (id!=null){
data.remove("id");
bulkRequest.add(new IndexRequest(esIndex).id(id.toString()).source(data));
}else{//让es自动生成id
bulkRequest.add(new IndexRequest(esIndex).source(data));
}
}
try {
BulkResponse response = getRestHighLevelClient().bulk(bulkRequest, RequestOptions.DEFAULT);
System.out.println(response.hasFailures());
if (!response.hasFailures()){
System.out.println(String.format("索引%s批量插入成功,共插入%d条",esIndex,datalist.size()));
}else{
System.out.println(String.format("索引%s批量插入失败",esIndex));
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
MyEsUtils myEsUtils = new MyEsUtils();
Map<String ,String> filedsMap = new HashMap<String,String>();
//filedsMap.put("area","Botswana");
//filedsMap.put("item","Roots, Other");
//filedsMap.put("indicatorname","Quantity");
BoolQueryBuilder queryBuilder = myEsUtils.getQueryBuilder(filedsMap);
List<String> glFields = new ArrayList<>();
glFields.add("name");
List<Map<String,Object>> list = myEsUtils.getPageResultList(queryBuilder, "nv_excel_pickup_origi",1,5, glFields);
System.out.println(list.size());
for (Map<String,Object> s : list) {
System.out.println(s);
}
System.out.println("count:"+myEsUtils.getResultCount(queryBuilder,"nv_excel_pickup_origi"));
/* HashMap<String, String> fileds = new HashMap<String, String>();
fileds.put("name","keyword");
fileds.put("age","long");
fileds.put("create_time","keyword");
createIndex("company3",1,0,fileds);*/
//deleteIndex("company3");
/* HashMap<String, Object> fileds = new HashMap<String, Object>();
fileds.put("age",25);
fileds.put("name","wangwu2");
fileds.put("@version",3);*/
//deleteDataById("company","4");
/* ArrayList<Map<String, Object>> datas = new ArrayList<Map<String, Object>>();
HashMap<String, Object> data1 = new HashMap<String, Object>();
data1.put("id",5);
data1.put("name","zhangsan");
data1.put("age",18);
HashMap<String, Object> data2 = new HashMap<String, Object>();
data2.put("id",7);
data2.put("name","diao");
data2.put("age",22);
datas.add(data1);
datas.add(data2);
bulkLoad("company",datas);*/
myEsUtils.closeClient();
}
public long deleteDataByQuery(String esIndex, QueryBuilder queryBuilder, int maxDocs) throws IOException {
long deletedCount = 0L;
DeleteByQueryRequest request = new DeleteByQueryRequest(esIndex);
request.setQuery(queryBuilder);
//最大设置10000
request.setBatchSize(10000);
//设置版本冲突时继续
request.setConflicts("proceed");
//最多处理文档数
request.setMaxDocs(maxDocs);
// 使用滚动参数来控制“搜索上下文”存活的时间
//request.setScroll(TimeValue.timeValueMinutes(10));
Long resultCount = getResultCount(queryBuilder, esIndex);
System.out.println(String.format("待删除数据%d条",resultCount));
int num = (int) Math.ceil((resultCount / (double) maxDocs));
for (int i = 1; i <= num; i++) {
BulkByScrollResponse bulkByScrollResponse = getRestHighLevelClient().deleteByQuery(request, RequestOptions.DEFAULT);
deletedCount += bulkByScrollResponse.getDeleted();
}
return deletedCount;
}
}
7、控制器
HighLevelRestController.java
package com.example.controller;
import com.alibaba.fastjson.JSON;
import com.example.common.ResponseBean;
import com.example.constant.Constant;
import com.example.model.BookDto;
import com.example.model.User;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;
import com.example.base.MyEsUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* HighLevelRestController
* @author SongBin
* @date 2019/8/14 16:24
*/
@RestController
@RequestMapping("/high")
public class HighLevelRestController {
/**
* logger
*/
private static final Logger logger = LoggerFactory.getLogger(HighLevelRestController.class);
@Autowired
private MyEsUtils MyEsUtils;
/**
* 列表查询
*
* @param page
* @param rows
* @param keyword
* @return com.example.common.ResponseBean
* @throws
* @author wliduo[i@dolyw.com]
* @date 2019/8/15 16:01
*/
@GetMapping("/user")
public ResponseBean list(@RequestParam(defaultValue = "1") Integer page,
@RequestParam(defaultValue = "10") Integer rows,
String keyword) throws IOException {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.wildcardQuery("name",keyword+"*"));
List<Map<String,Object>> list = MyEsUtils.getPageResultList(boolQueryBuilder, "mto_user",page,rows);
System.out.println(list.size());
System.out.println("count:"+MyEsUtils.getResultCount(boolQueryBuilder,"mto_user"));
long total = MyEsUtils.getResultCount(boolQueryBuilder,"mto_user");
// 遍历封装列表对象
List<User> userList = new ArrayList<>();
for (Map<String,Object> map : list) {
userList.add(JSON.parseObject(JSON.toJSONString(map), User.class));
}
// 封装Map参数返回
Map<String, Object> result = new HashMap<String, Object>(16);
result.put("count", total);
result.put("data", userList);
return new ResponseBean(HttpStatus.OK.value(), "查询成功", result);
}
}
更多内容请访问:IT源点
注意:本文归作者所有,未经作者允许,不得转载