<>ES 聚合查询Java Api

分页查询,按照时间进行分组查询求平均(多平均),按照某一字段分组求聚合等等

* 分页查询 SysUser loginUser = SecurityUtils.getLoginUser().getUser(); Long
userId= loginUser.getUserId(); LambdaQueryChainWrapper<Warning> query =
warningService.lambdaQuery(); if (req.getStatus()!=null && req.getStatus().size(
) > 0){ if (req.getStatus().contains(null)){ throw new BaseException(
"错误的'处理状态'字段信息!"); }else { query.in(Warning::getProcessStatus, req.getStatus());
} } //query.in(req.getStatus()!=null && req.getStatus().size() >
0,Warning::getProcessStatus, req.getStatus()); query.eq(StringUtils.isNotEmpty(
req.getSiteId()), Warning::getDeptId, req.getSiteId()); query.like(StringUtils.
isNotEmpty(req.getDevCode()), Warning::getDevCode, req.getDevCode()); query.like
(StringUtils.isNotEmpty(req.getDevName()), Warning::getDevName, req.getDevName()
); List<WarningProcess> processes = warningProcessService.getWarningProcess(
userId); if (processes == null || processes.size() == 0)// return new
TableDataInfo();//没有处理人的话,就按照当前登录人的站点权限过滤 { List<Long> userDepts = userService.
queryUserDepts(userId); query.in(Warning::getDeptId, userDepts); } else{ List<
Long> warnIds = processes.stream().map(m -> m.getWarnId()).collect(Collectors.
toList()); query.in(CollectionUtils.isNotEmpty(warnIds), Warning::getId, warnIds
); } query.orderByDesc(Warning::getWarnTime); TableDataInfo data =
warningService.page(req, query); List<SysDept> depts = sysDeptService.list();
data.getRows().stream().forEach(item -> { Warning warnItem = (Warning) item;
SysDept dept= depts.stream().filter(m -> m.getDeptId().equals(warnItem.getDeptId
())).findFirst().orElse(null); if (null != dept) { warnItem.setDeptName(dept.
getDeptName()); } }); return data;
mybatisPlus 转换成es查询的JavaApi
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() //从哪到哪 .
from((req.getPageNum() - 1) * req.getPageSize()) //数量 .size(req.getPageSize())
//按照warn_time 倒序排序 .sort("warn_time", SortOrder.DESC) .trackTotalHits(true);
BoolQueryBuilder boolQueryBuilder= QueryBuilders.boolQuery(); //相当于sql 的 in 操作
if (req.getStatus()!=null && req.getStatus().size() > 0){ boolQueryBuilder.
filter(QueryBuilders.termsQuery("process_status",req.getStatus())); } //term
精确匹配 if (StringUtils.isNotEmpty(req.getSiteId())){ boolQueryBuilder.filter(
QueryBuilders.termQuery("dept_id",req.getSiteId())); } //模糊查询 if (StringUtils.
isNotEmpty(req.getDevCode())){ boolQueryBuilder.filter(QueryBuilders.
wildcardQuery("dev_code.keyword","*"+req.getDevCode()+"*")); } if (StringUtils.
isNotEmpty(req.getDevName())){ boolQueryBuilder.filter(QueryBuilders.
wildcardQuery("dev_name.keyword","*"+req.getDevName()+"*")); } List<
WarningProcess> processes = warningProcessService.getWarningProcess(userId); if
(processes == null || processes.size() == 0)// return new
TableDataInfo();//没有处理人的话,就按照当前登录人的站点权限过滤 { List<Long> userDepts = userService.
queryUserDepts(userId); boolQueryBuilder.filter(QueryBuilders.termsQuery(
"dept_id.keyword",userDepts)); } else{ List<Long> warnIds = processes.stream().
map(m -> m.getWarnId()).collect(Collectors.toList()); if (CollectionUtils.
isNotEmpty(warnIds)){ //ES in操作 termsQuery boolQueryBuilder.filter(QueryBuilders
.termsQuery("id",warnIds)); } } //进行查询 searchSourceBuilder.query(
boolQueryBuilder); //返回结果 SearchResponse searchResponse = EsClintUtil.execSearch
(restHighLevelClient,searchSourceBuilder, index); //返回总数 long total =
searchResponse.getHits().getTotalHits(); List<Warning> list = new ArrayList<>();
final SearchHit[] searchHits = searchResponse.getHits().getHits(); for (
SearchHit searchHit: searchHits) { list.add(JSON.parseObject(searchHit.
getSourceAsString(), Warning.class)); } //返回自定义表格信息 TableDataInfo info = new
TableDataInfo(list, (int) total);
* 桶聚合 SELECT dept_name, count(d1.id) as imgcount, count(distinct d1.
device_code) as imgcount1, sum(case d1.`status` when '1' then 1 ELSE 0 END) as
warningCountFROM sys_dept s1 LEFT JOIN dev_shock_data d1 ON s1.dept_id = d1.
dept_idwhere s1.dept_id!=100 GROUP BY s1.dept_name
数据库的sql对应的Java代码
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder= QueryBuilders.boolQuery(); boolQueryBuilder.
mustNot(QueryBuilders.termQuery("dept_id", 100)); searchSourceBuilder.query(
boolQueryBuilder); searchSourceBuilder.size(0);//不输出原始数据 //按照站点分组
TermsAggregationBuilder aggregationBuilder= AggregationBuilders .terms(
"deptgroup").field("dept_id") //桶分组 .subAggregation(AggregationBuilders.terms(
"statusgroup").field("status.keyword")) //设备个数去重数量 .subAggregation(
AggregationBuilders.cardinality("imgcount1").field("device_code.keyword")) .size
(5000); searchSourceBuilder.aggregation(aggregationBuilder); SearchResponse
searchResponse= EsClintUtil.execSearch(restHighLevelClient,searchSourceBuilder,
"dev_shock_data"); // 遍历封装列表对象 List<ImageStatisticRes> resList=new ArrayList<>()
; Terms parsedStringTerms =searchResponse.getAggregations().get("deptgroup");
List<? extends Terms.Bucket> buckets = parsedStringTerms.getBuckets(); Map<
String,String> deptMap= EsClintUtil.getDepts(restHighLevelClient); List<String>
list= new ArrayList<>(); for (Terms.Bucket bucket : buckets) { //key的数据 String
key= bucket.getKey().toString(); long docCount = bucket.getDocCount(); //获取数据
ImageStatisticRes res= new ImageStatisticRes(); res.setSiteName(deptMap.get(key)
); res.setImageCount((int) docCount); //获取设备个数 Cardinality imagecount1 = bucket.
getAggregations().get("imgcount1"); res.setImageCount1((int) imagecount1.
getValue()); Terms statusgroupTearms = bucket.getAggregations().get(
"statusgroup"); for (Terms.Bucket statusBucket : statusgroupTearms.getBuckets())
{ if (statusBucket.getKey().equals("1")) { res.setImageWarningCount((int)
statusBucket.getDocCount()); } } resList.add(res); list.add(key); } //获取到总的站台数据
final List<String> deptIds = deptMap.keySet().stream().collect(Collectors.toList
()); //获取差集 final List<String> collect = deptIds.stream() .filter(item -> !list.
contains(item)) .collect(Collectors.toList()); //遍历得到的差集就是不存在站台数据的 for (String s
: collect) { ImageStatisticRes res = new ImageStatisticRes(); res.setSiteName(
deptMap.get(s)); res.setImageCount(0); res.setImageWarningCount(0); resList.add(
res); } return resList;
* 时间聚合分组 SELECT HOUR (create_time) khour ,avg(angle_x) averageX, avg(angle_y)
averageY, avg(angle_z) averageZ from dev_shock_data where device_id=#{deviceId}
and create_time >#{startTime} and create_time <#{endTime} group by HOUR (
create_time) order by khour
数据库的sql转换成es对应的JavaApi
final ArrayList<GroupByHour> groupByHours = new ArrayList<>();
SearchSourceBuilder searchSourceBuilder= new SearchSourceBuilder() .sort(
"create_time", SortOrder.ASC) .trackTotalHits(true); BoolQueryBuilder
boolQueryBuilder= QueryBuilders.boolQuery(); boolQueryBuilder.must(QueryBuilders
.termQuery("device_id",deviceId)); boolQueryBuilder.filter(QueryBuilders.
rangeQuery("create_time").gt(startTime).lt(endTime)); searchSourceBuilder.size(0
); searchSourceBuilder.query(boolQueryBuilder); AvgAggregationBuilder averageX =
AggregationBuilders.avg("averageX").field(x); AvgAggregationBuilder averageY =
AggregationBuilders.avg("averageY").field(y); AvgAggregationBuilder averageZ =
AggregationBuilders.avg("averageZ").field(z); AggregationBuilder groupTime =
AggregationBuilders.dateHistogram("khour") .field("create_time") //
根据date字段值,对数据进行分组 // 时间分组间隔:DateHistogramInterval.* 序列常量,支持每月,每年,每天等等时间间隔 .
dateHistogramInterval(DateHistogramInterval.HOUR) // 设置返回结果中桶key的时间格式 .format(
"HH") // .timeZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("GMT%2B8")))
// .timeZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("GMT+8:00"))) .
minDocCount(0L) .subAggregation(averageX) .subAggregation(averageY) .
subAggregation(averageZ); searchSourceBuilder.aggregation(groupTime); final
SearchResponse dev_shock_data= EsClintUtil.execSearch(restHighLevelClient,
searchSourceBuilder, "dev_shock_data"); ParsedDateHistogram khour =
dev_shock_data.getAggregations().get("khour"); final List<? extends Histogram.
Bucket> buckets1 = khour.getBuckets(); for (Histogram.Bucket bucket : buckets1)
{ final GroupByHour groupByHour = new GroupByHour(); if(bucket.getDocCount()>0){
groupByHour.setHour(Integer.parseInt(bucket.getKeyAsString())); Avg xAvg =
bucket.getAggregations().get("averageX"); Avg yAvg = bucket.getAggregations().
get("averageY"); Avg zAvg = bucket.getAggregations().get("averageZ");
groupByHour.setAverageX(xAvg.getValue()); groupByHour.setAverageY(yAvg.getValue(
)); groupByHour.setAverageZ(zAvg.getValue()); groupByHours.add(groupByHour); } }
return groupByHours;
<>欢迎大家前来提需求!

技术
下载桌面版
GitHub
百度网盘(提取码:draw)
Gitee
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:766591547
关注微信