JavaAPI的方式操作ES的索引库。
针对Java API,目前ES提供了两个Java REST Client版本:
Java Low Level REST Client 低级别的REST客户端,通过HTTP与集群交互,用户需自己组装请求JSON串,以及解析响应JSON串。兼容所有Elasticsearch版本。这种方式其实就相当于使用Java对前面讲的REST API做了一层简单的封装,前面我们是使用的CURL这个工具执行的,现在是使用Java代码模拟执行HTTP请求了。
Java High Level REST Client 高级别的REST客户端,基于低级别的REST客户端进行了封装,增加了组装请求JSON串、解析响应JSON串等相关API,开发代码使用的ES版本需要和集群中的ES版本一致,否则会有版本冲突问题。这种方式是从ES 6.0版本开始加入的,目的是以Java面向对象的方式进行请求、响应处理。 高级别的REST客户端会兼容高版本的ES集群,例如:使用ES7.0版本开发的代码可以和任何7.x版本的ES集群交互。
如果ES集群后期升级到了8.x版本,那么也要升级之前基于ES 7.0版本开发的代码。
版本建议: - 如果考虑到代码后期的兼容性,建议使用Java Low Level REST Client。 - 如果考虑到易用性,建议使用Java High Level REST Client。
本节我们使用Java High Level REST Client。
创建springboot项目:db_elasticsearch
在pom.xml文件中添加ES的依赖和日志的依赖。
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.3.12.RELEASE</spring-boot.version>
<elasticsearch.version>7.13.4</elasticsearch.version>
<log4j.version>2.14.1</log4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
...
</dependencies>
在resources目录下添加log4j2.properties。
appender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %marker%m%n
rootLogger.level = info
rootLogger.appenderRef.console.ref = console
索引库的操作(创建、删除)
package com.simoniu.db_elasticsearch.index;
import org.apache.http.HttpHost;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.common.settings.Settings;
import java.io.IOException;
/**
* 针对ES中索引库的操作
* 1:创建索引库
* 2:删除索引库
* Created by simoniu
*/
public class EsIndexOperateDemo {
public static void main(String[] args) throws Exception{
//获取RestClient连接
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("master", 9200, "http"),
new HttpHost("slave1", 9200, "http"),
new HttpHost("slave2", 9200, "http")));
//创建索引库
createIndex(client);
//删除索引库
//deleteIndex(client);
//关闭连接
client.close();
}
private static void deleteIndex(RestHighLevelClient client) throws IOException {
DeleteIndexRequest deleteRequest = new DeleteIndexRequest("java_test");
//执行
client.indices().delete(deleteRequest, RequestOptions.DEFAULT);
}
private static void createIndex(RestHighLevelClient client) throws IOException {
CreateIndexRequest createRequest = new CreateIndexRequest("java_test");
//指定索引库的配置信息
createRequest.settings(Settings.builder()
.put("index.number_of_shards", 3)//指定分片个数
);
//执行
client.indices().create(createRequest, RequestOptions.DEFAULT);
}
}
索引的操作(增、删、改、查、Bulk批量操作)
package com.simoniu.db_elasticsearch.data;
import org.apache.http.HttpHost;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* 针对ES中索引数据的操作
* 增删改查
* Created by simoniu
*/
public class EsDataOperateDemo {
private static Logger logger = LogManager.getLogger(EsDataOperateDemo.class);
public static void main(String[] args) throws Exception {
//获取RestClient连接
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("master", 9200, "http"),
new HttpHost("slave1", 9200, "http"),
new HttpHost("slave2", 9200, "http")));
//创建索引
//addIndexByJson(client);
//addIndexByMap(client);
//查询索引
//getIndex(client);
//getIndexByFiled(client);
//更新索引
//注意:可以使用创建索引直接完整更新已存在的数据
//getIndex(client);
//updateIndexByPart(client);//局部更新
//getIndex(client);
//删除索引
//deleteIndex(client);
//getIndex(client);
//Bulk批量操作
//bulkIndex(client);
//getIndex(client);
//关闭连接
client.close();
}
private static void bulkIndex(RestHighLevelClient client) throws IOException {
BulkRequest request = new BulkRequest();
request.add(new IndexRequest("java_test").id("20")
.source(XContentType.JSON, "name", "messi", "age", "38"));
request.add(new DeleteRequest("java_test", "10"));//id为10的数据不存在,但是执行删除是不会报错的
request.add(new UpdateRequest("java_test", "11")
.doc(XContentType.JSON, "age", 19));
request.add(new UpdateRequest("java_test", "12")//id为12的数据不存在,这一条命令在执行的时候会失败
.doc(XContentType.JSON, "age", 19));
//执行
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
//如果Bulk中的个别语句出错不会导致整个Bulk执行失败,所以可以在这里判断一下是否有返回执行失败的信息
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
logger.error("Bulk中出现了异常:" + failure);
}
}
}
private static void deleteIndex(RestHighLevelClient client) throws IOException {
DeleteRequest request = new DeleteRequest("java_test", "10");
//执行
client.delete(request, RequestOptions.DEFAULT);
}
private static void updateIndexByPart(RestHighLevelClient client) throws IOException {
UpdateRequest request = new UpdateRequest("java_test", "10");
String jsonString = "{\"age\":45}";
request.doc(jsonString, XContentType.JSON);
//执行
client.update(request, RequestOptions.DEFAULT);
}
private static void getIndexByFiled(RestHighLevelClient client) throws IOException {
GetRequest request = new GetRequest("java_test", "10");
//只查询部分字段
String[] includes = new String[]{"name"};//指定包含哪些字段
String[] excludes = Strings.EMPTY_ARRAY;//指定多滤掉哪些字段
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext);
//执行
GetResponse response = client.get(request, RequestOptions.DEFAULT);
//通过response获取index、id、文档详细内容(source)
String index = response.getIndex();
String id = response.getId();
if (response.isExists()) {//如果没有查询到文档数据,则isExists返回false
//获取json字符串格式的文档结果
String sourceAsString = response.getSourceAsString();
System.out.println(sourceAsString);
//获取map格式的文档结果
Map<String, Object> sourceAsMap = response.getSourceAsMap();
System.out.println(sourceAsMap);
} else {
logger.warn("没有查询到索引库{}中id为{}的文档!", index, id);
}
}
private static void getIndex(RestHighLevelClient client) throws IOException {
GetRequest request = new GetRequest("java_test", "20");
//执行
GetResponse response = client.get(request, RequestOptions.DEFAULT);
//通过response获取index、id、文档详细内容(source)
String index = response.getIndex();
String id = response.getId();
if (response.isExists()) {//如果没有查询到文档数据,则isExists返回false
//获取json字符串格式的文档结果
String sourceAsString = response.getSourceAsString();
System.out.println(sourceAsString);
//获取map格式的文档结果
Map<String, Object> sourceAsMap = response.getSourceAsMap();
System.out.println(sourceAsMap);
} else {
logger.warn("没有查询到索引库{}中id为{}的文档!", index, id);
}
}
private static void addIndexByMap(RestHighLevelClient client) throws IOException {
IndexRequest request = new IndexRequest("java_test");
request.id("11");
HashMap<String, Object> jsonMap = new HashMap<String, Object>();
jsonMap.put("name", "tom");
jsonMap.put("age", 17);
request.source(jsonMap);
//执行
client.index(request, RequestOptions.DEFAULT);
}
private static void addIndexByJson(RestHighLevelClient client) throws IOException {
IndexRequest request = new IndexRequest("java_test");
request.id("10");
String jsonString = "{" +
"\"name\":\"jessic\"," +
"\"age\":20" +
"}";
request.source(jsonString, XContentType.JSON);
//执行
client.index(request, RequestOptions.DEFAULT);
}
}