SpringBoot集成Elasticsearch7.4-实战(2)

Rothschil 2019-06-20 16:31:00
springboot

内容规划总共分为三个章节来写,分别运行环境构建、利用Web应用管理索引以及Web应用管理数据三大块来说明。

具体有:
[这些操作都是在CentOS下操作的,主要带大家熟悉下Elasticsearch环境]

[在Springboot环境下,利用JAVA环境操作索引]

[在Springboot环境下,管理数据]

elasticsearch

20191205172258.png

2. SpringBoot集成

开发工具,这里选择的是IDEA 2019.2,构建Maven工程等一堆通用操作,不清楚的自行百度。

2.1. POM配置

我这边选择 elasticsearch-rest-high-level-client 方式来集成,发现这有个坑,开始没注意,踩了好久,就是要排除掉 elasticsearch、elasticsearch-rest-client ,这里没有选择 spring-boot-starter-data-elasticsearch ,因为最新版的 starter 现在依然是6.x版本号,并没有集成 elasticsearch7.4.0,导致使用过程中有很多版本冲突,读者在选择的时候多加留意。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.4.0</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
<exclusion>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.4.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.4.0</version>
</dependency>

2.2. yml配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
server:
port: 9090
spring:
datasource:
name: mysql
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/springboot?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=UTC
username: root
password: 123456
druid:
initial-size: 5
min-idle: 5
max-active: 20
max-wait: 30000
time-between-eviction-runs-millis: 60000
min-evictable-idle-time-millis: 300000
validation-query: select 1
test-while-idle: true
test-on-borrow: false
test-on-return: false
pool-prepared-statements: false
max-pool-prepared-statement-per-connection-size: 20
connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=6000
es:
host: 192.168.147.132
port: 9200
scheme: http

mybatis:
mapperLocations: classpath:mapper/**/*.xml

这里定义 es 节点下即 elasticsearch 的地址端口信息,修改为自己的即可。

2.3. 核心操作类

为了规范索引管理,这里将所有的操作都封装成一个基类,实现对索引的增删改查。同时还集成了对数据的单个以及批量的插入以及删除。避免针对每个索引都自己写一套实现,杜绝代码的冗余,同时这样的集成对代码的结构本身也是低侵入性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
package xyz.wongs.weathertop.base.dao;

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import sun.rmi.runtime.Log;
import xyz.wongs.weathertop.base.entiy.ElasticEntity;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

@Slf4j
@Component
public class BaseElasticDao {

@Autowired
RestHighLevelClient restHighLevelClient;

/**
* @author WCNGS@QQ.COM
* @See
* @date 2019/10/17 17:30
* @param idxName 索引名称
* @param idxSQL 索引描述
* @return void
* @throws
* @since
*/
public void createIndex(String idxName,String idxSQL){

try {

if (!this.indexExist(idxName)) {
log.error(" idxName={} 已经存在,idxSql={}",idxName,idxSQL);
return;
}
CreateIndexRequest request = new CreateIndexRequest(idxName);
buildSetting(request);
request.mapping(idxSQL, XContentType.JSON);
//request.settings() 手工指定Setting
CreateIndexResponse res = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
if (!res.isAcknowledged()) {
throw new RuntimeException("初始化失败");
}
} catch (Exception e) {
e.printStackTrace();
System.exit(0);
}
}

/** 断某个index是否存在
* @author WCNGS@QQ.COM
* @See
* @date 2019/10/17 17:27
* @param idxName index名
* @return boolean
* @throws
* @since
*/
public boolean indexExist(String idxName) throws Exception {
GetIndexRequest request = new GetIndexRequest(idxName);
request.local(false);
request.humanReadable(true);
request.includeDefaults(false);

request.indicesOptions(IndicesOptions.lenientExpandOpen());
return restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
}

/** 设置分片
* @author WCNGS@QQ.COM
* @See
* @date 2019/10/17 19:27
* @param request
* @return void
* @throws
* @since
*/
public void buildSetting(CreateIndexRequest request){

request.settings(Settings.builder().put("index.number_of_shards",3)
.put("index.number_of_replicas",2));
}
/**
* @author WCNGS@QQ.COM
* @See
* @date 2019/10/17 17:27
* @param idxName index
* @param entity 对象
* @return void
* @throws
* @since
*/
public void insertOrUpdateOne(String idxName, ElasticEntity entity) {

IndexRequest request = new IndexRequest(idxName);
request.id(entity.getId());
request.source(JSON.toJSONString(entity.getData()), XContentType.JSON);
try {
restHighLevelClient.index(request, RequestOptions.DEFAULT);
} catch (Exception e) {
throw new RuntimeException(e);
}
}


/** 批量插入数据
* @author WCNGS@QQ.COM
* @See
* @date 2019/10/17 17:26
* @param idxName index
* @param list 带插入列表
* @return void
* @throws
* @since
*/
public void insertBatch(String idxName, List<ElasticEntity> list) {

BulkRequest request = new BulkRequest();
list.forEach(item -> request.add(new IndexRequest(idxName).id(item.getId())
.source(JSON.toJSONString(item.getData()), XContentType.JSON)));
try {
restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/** 批量删除
* @author WCNGS@QQ.COM
* @See
* @date 2019/10/17 17:14
* @param idxName index
* @param idList 待删除列表
* @return void
* @throws
* @since
*/
public <T> void deleteBatch(String idxName, Collection<T> idList) {

BulkRequest request = new BulkRequest();
idList.forEach(item -> request.add(new DeleteRequest(idxName, item.toString())));
try {
restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* @author WCNGS@QQ.COM
* @See
* @date 2019/10/17 17:14
* @param idxName index
* @param builder 查询参数
* @param c 结果类对象
* @return java.util.List<T>
* @throws
* @since
*/
public <T> List<T> search(String idxName, SearchSourceBuilder builder, Class<T> c) {

SearchRequest request = new SearchRequest(idxName);
request.source(builder);
try {
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
SearchHit[] hits = response.getHits().getHits();
List<T> res = new ArrayList<>(hits.length);
for (SearchHit hit : hits) {
res.add(JSON.parseObject(hit.getSourceAsString(), c));
}
return res;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/** 删除index
* @author WCNGS@QQ.COM
* @See
* @date 2019/10/17 17:13
* @param idxName
* @return void
* @throws
* @since
*/
public void deleteIndex(String idxName) {
try {
if (!this.indexExist(idxName)) {
log.error(" idxName={} 已经存在",idxName);
return;
}
restHighLevelClient.indices().delete(new DeleteIndexRequest(idxName), RequestOptions.DEFAULT);
} catch (Exception e) {
throw new RuntimeException(e);
}
}


/**
* @author WCNGS@QQ.COM
* @See
* @date 2019/10/17 17:13
* @param idxName
* @param builder
* @return void
* @throws
* @since
*/
public void deleteByQuery(String idxName, QueryBuilder builder) {

DeleteByQueryRequest request = new DeleteByQueryRequest(idxName);
request.setQuery(builder);
//设置批量操作数量,最大为10000
request.setBatchSize(10000);
request.setConflicts("proceed");
try {
restHighLevelClient.deleteByQuery(request, RequestOptions.DEFAULT);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

3. 实战

通过以上的集成,我们看到完成在项目中对 elasticsearch 的集成,同时也用基类,将所有可能的操作都封装起来。下来我们通过对基类的讲解,来逐个说明!

3.1. 索引管理

由于在BaseElasticDao类中createIndex方法,我在Controller层将索引名称和索引SQL封装过,详细见Github演示源码xyz.wongs.weathertop.palant.vo.IdxVo

3.1.1. 创建索引

我们在创建索引过程中需要先判断是否有这个索引,否则不允许创建,由于我案例采用的是手动指定indexName和Settings,大家看的过程中要特别注意下,而且还有一点indexName必须是小写,如果是大写在创建过程中会有错误

官方索引创建说明

索引名大写

。详细的代码实现见如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

/**
* @Description 创建Elastic索引
* @param idxVo
* @return xyz.wongs.weathertop.base.message.response.ResponseResult
* @throws
* @date 2019/11/19 11:07
*/
@RequestMapping(value = "/createIndex",method = RequestMethod.POST)
public ResponseResult createIndex(@RequestBody IdxVo idxVo){
ResponseResult response = new ResponseResult();
try {
//索引不存在,再创建,否则不允许创建
if(!baseElasticDao.indexExist(idxVo.getIdxName())){
String idxSql = JSONObject.toJSONString(idxVo.getIdxSql());
log.warn(" idxName={}, idxSql={}",idxVo.getIdxName(),idxSql);
baseElasticDao.createIndex(idxVo.getIdxName(),idxSql);
} else{
response.setStatus(false);
response.setCode(ResponseCode.DUPLICATEKEY_ERROR_CODE.getCode());
response.setMsg("索引已经存在,不允许创建");
}
} catch (Exception e) {
response.setStatus(false);
response.setCode(ResponseCode.ERROR.getCode());
response.setMsg(ResponseCode.ERROR.getMsg());
}
return response;
}

创建索引需要设置分片,这里采用Settings.Builder方式,当然也可以JSON自定义方式,本文篇幅有限,不做演示。查看xyz.wongs.weathertop.base.service.BaseElasticService.buildSetting方法,这里是默认值。

index.number_of_shards:分片数

number_of_replicas:副本数

1
2
3
4
5
6
7
8
9
10
11
12
13
14

/** 设置分片
* @author WCNGS@QQ.COM
* @See
* @date 2019/10/17 19:27
* @param request
* @return void
* @throws
* @since
*/
public void buildSetting(CreateIndexRequest request){
request.settings(Settings.builder().put("index.number_of_shards",3)
.put("index.number_of_replicas",2));

这时候我们通过Postman工具调用Controller,发现创建索引成功。

新增索引

再命令行执行**curl -H “Content-Type: application/json” -X GET “http://localhost:9200/_cat/indices?v"**,效果如图:

1
2
3
4
5
6
7
8
9

[elastic@localhost elastic]$ curl -H "Content-Type: application/json" -X GET "http://localhost:9200/_cat/indices?v"
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
yellow open twitter scSSD1SfRCio4F77Hh8aqQ 3 2 2 0 8.3kb 8.3kb
yellow open idx_location _BJ_pOv0SkS4tv-EC3xDig 3 2 1 0 4kb 4kb
yellow open wongs uT13XiyjSW-VOS3GCqao8w 3 2 1 0 3.4kb 3.4kb
yellow open idx_locat Kr3wGU7JT_OUrRJkyFSGDw 3 2 3 0 13.2kb 13.2kb
yellow open idx_copy_to HouC9s6LSjiwrJtDicgY3Q 3 2 1 0 4kb 4kb

说明创建成功,这里总是通过命令行来验证,有点繁琐,既然我都有WEB服务,为什么不直接通过HTTP验证了?

3.1.2. 查看索引

我们写一个对外以HTTP+GET方式对外提供查询的服务。存在为TRUE,否则False.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* @Description 判断索引是否存在;存在-TRUE,否则-FALSE
* @param index
* @return xyz.wongs.weathertop.base.message.response.ResponseResult
* @throws
* @date 2019/11/19 18:48
*/
@RequestMapping(value = "/exist/{index}")
public ResponseResult indexExist(@PathVariable(value = "index") String index){

ResponseResult response = new ResponseResult();
try {
if(!baseElasticDao.isExistsIndex(index)){
log.error("index={},不存在",index);
response.setCode(ResponseCode.RESOURCE_NOT_EXIST.getCode());
response.setMsg(ResponseCode.RESOURCE_NOT_EXIST.getMsg());
} else {
response.setMsg(" 索引已经存在, " + index);
}
} catch (Exception e) {
response.setCode(ResponseCode.NETWORK_ERROR.getCode());
response.setMsg(" 调用ElasticSearch 失败!");
response.setStatus(false);
}
return response;
}

3.1.3. 删除索引

删除的逻辑就比较简单,这里就不多说。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/** 删除index
* @author WCNGS@QQ.COM
* @See
* @date 2019/10/17 17:13
* @param idxName
* @return void
* @throws
* @since
*/
public void deleteIndex(String idxName) {
try {
if (!this.indexExist(idxName)) {
log.error(" idxName={} 已经存在",idxName);
return;
}
restHighLevelClient.indices().delete(new DeleteIndexRequest(idxName), RequestOptions.DEFAULT);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

4. 源码

Github演示源码 ,记得给Star

Gitee演示源码,记得给Star