← 返回首页
JavaAPI的方式操作ES的索引库
发表时间:2023-12-30 15:40:40
JavaAPI的方式操作ES的索引库

JavaAPI的方式操作ES的索引库。

1.JavaAPI的方式操作ES的索引库

针对Java API,目前ES提供了两个Java REST Client版本:

如果ES集群后期升级到了8.x版本,那么也要升级之前基于ES 7.0版本开发的代码。

版本建议: - 如果考虑到代码后期的兼容性,建议使用Java Low Level REST Client。 - 如果考虑到易用性,建议使用Java High Level REST Client。

本节我们使用Java High Level REST Client。

创建springboot项目:db_elasticsearch

在pom.xml文件中添加ES的依赖和日志的依赖。


<properties>
    <java.version>1.8</java.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <spring-boot.version>2.3.12.RELEASE</spring-boot.version>
    <elasticsearch.version>7.13.4</elasticsearch.version>
    <log4j.version>2.14.1</log4j.version>
</properties>

<dependencies>
   <dependency>
      <groupId>org.elasticsearch.client</groupId>
      <artifactId>elasticsearch-rest-high-level-client</artifactId>
      <version>${elasticsearch.version}</version>
   </dependency>

   <dependency>
     <groupId>org.elasticsearch.client</groupId>
     <artifactId>elasticsearch-rest-client</artifactId>
     <version>${elasticsearch.version}</version>
   </dependency>
   <dependency>
      <groupId>org.elasticsearch</groupId>
      <artifactId>elasticsearch</artifactId>
      <version>${elasticsearch.version}</version>
   </dependency>

   <dependency>
     <groupId>org.apache.logging.log4j</groupId>
     <artifactId>log4j-core</artifactId>
    <version>${log4j.version}</version>
  </dependency>
  ...
</dependencies>

在resources目录下添加log4j2.properties。

appender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %marker%m%n

rootLogger.level = info
rootLogger.appenderRef.console.ref = console

索引库的操作(创建、删除)

package com.simoniu.db_elasticsearch.index;
import org.apache.http.HttpHost;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.common.settings.Settings;

import java.io.IOException;

/**
 * 针对ES中索引库的操作
 * 1:创建索引库
 * 2:删除索引库
 * Created by simoniu
 */
public class EsIndexOperateDemo {

    public static void main(String[] args) throws Exception{
        //获取RestClient连接
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("master", 9200, "http"),
                        new HttpHost("slave1", 9200, "http"),
                        new HttpHost("slave2", 9200, "http")));

        //创建索引库
        createIndex(client);

        //删除索引库
        //deleteIndex(client);

        //关闭连接
        client.close();
    }


    private static void deleteIndex(RestHighLevelClient client) throws IOException {
        DeleteIndexRequest deleteRequest = new DeleteIndexRequest("java_test");
        //执行
        client.indices().delete(deleteRequest, RequestOptions.DEFAULT);
    }


    private static void createIndex(RestHighLevelClient client) throws IOException {
        CreateIndexRequest createRequest = new CreateIndexRequest("java_test");
        //指定索引库的配置信息
        createRequest.settings(Settings.builder()
                .put("index.number_of_shards", 3)//指定分片个数
        );

        //执行
        client.indices().create(createRequest, RequestOptions.DEFAULT);
    }
}

索引的操作(增、删、改、查、Bulk批量操作)

package com.simoniu.db_elasticsearch.data;

import org.apache.http.HttpHost;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * 针对ES中索引数据的操作
 * 增删改查
 * Created by simoniu
 */
public class EsDataOperateDemo {

    private static Logger logger = LogManager.getLogger(EsDataOperateDemo.class);

    public static void main(String[] args) throws Exception {
        //获取RestClient连接
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("master", 9200, "http"),
                        new HttpHost("slave1", 9200, "http"),
                        new HttpHost("slave2", 9200, "http")));

        //创建索引
        //addIndexByJson(client);
        //addIndexByMap(client);

        //查询索引
        //getIndex(client);
        //getIndexByFiled(client);

        //更新索引
        //注意:可以使用创建索引直接完整更新已存在的数据
        //getIndex(client);
        //updateIndexByPart(client);//局部更新
        //getIndex(client);

        //删除索引
        //deleteIndex(client);
        //getIndex(client);

        //Bulk批量操作
        //bulkIndex(client);
        //getIndex(client);

        //关闭连接
        client.close();
    }

    private static void bulkIndex(RestHighLevelClient client) throws IOException {
        BulkRequest request = new BulkRequest();
        request.add(new IndexRequest("java_test").id("20")
                .source(XContentType.JSON, "name", "messi", "age", "38"));
        request.add(new DeleteRequest("java_test", "10"));//id为10的数据不存在,但是执行删除是不会报错的
        request.add(new UpdateRequest("java_test", "11")
                .doc(XContentType.JSON, "age", 19));
        request.add(new UpdateRequest("java_test", "12")//id为12的数据不存在,这一条命令在执行的时候会失败
                .doc(XContentType.JSON, "age", 19));
        //执行
        BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
        //如果Bulk中的个别语句出错不会导致整个Bulk执行失败,所以可以在这里判断一下是否有返回执行失败的信息
        for (BulkItemResponse bulkItemResponse : bulkResponse) {
            if (bulkItemResponse.isFailed()) {
                BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
                logger.error("Bulk中出现了异常:" + failure);
            }
        }
    }

    private static void deleteIndex(RestHighLevelClient client) throws IOException {
        DeleteRequest request = new DeleteRequest("java_test", "10");
        //执行
        client.delete(request, RequestOptions.DEFAULT);
    }

    private static void updateIndexByPart(RestHighLevelClient client) throws IOException {
        UpdateRequest request = new UpdateRequest("java_test", "10");
        String jsonString = "{\"age\":45}";
        request.doc(jsonString, XContentType.JSON);
        //执行
        client.update(request, RequestOptions.DEFAULT);
    }

    private static void getIndexByFiled(RestHighLevelClient client) throws IOException {
        GetRequest request = new GetRequest("java_test", "10");
        //只查询部分字段
        String[] includes = new String[]{"name"};//指定包含哪些字段
        String[] excludes = Strings.EMPTY_ARRAY;//指定多滤掉哪些字段
        FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
        request.fetchSourceContext(fetchSourceContext);
        //执行
        GetResponse response = client.get(request, RequestOptions.DEFAULT);
        //通过response获取index、id、文档详细内容(source)
        String index = response.getIndex();
        String id = response.getId();
        if (response.isExists()) {//如果没有查询到文档数据,则isExists返回false
            //获取json字符串格式的文档结果
            String sourceAsString = response.getSourceAsString();
            System.out.println(sourceAsString);
            //获取map格式的文档结果
            Map<String, Object> sourceAsMap = response.getSourceAsMap();
            System.out.println(sourceAsMap);
        } else {
            logger.warn("没有查询到索引库{}中id为{}的文档!", index, id);
        }
    }

    private static void getIndex(RestHighLevelClient client) throws IOException {
        GetRequest request = new GetRequest("java_test", "20");
        //执行
        GetResponse response = client.get(request, RequestOptions.DEFAULT);
        //通过response获取index、id、文档详细内容(source)
        String index = response.getIndex();
        String id = response.getId();
        if (response.isExists()) {//如果没有查询到文档数据,则isExists返回false
            //获取json字符串格式的文档结果
            String sourceAsString = response.getSourceAsString();
            System.out.println(sourceAsString);
            //获取map格式的文档结果
            Map<String, Object> sourceAsMap = response.getSourceAsMap();
            System.out.println(sourceAsMap);
        } else {
            logger.warn("没有查询到索引库{}中id为{}的文档!", index, id);
        }
    }

    private static void addIndexByMap(RestHighLevelClient client) throws IOException {
        IndexRequest request = new IndexRequest("java_test");
        request.id("11");
        HashMap<String, Object> jsonMap = new HashMap<String, Object>();
        jsonMap.put("name", "tom");
        jsonMap.put("age", 17);
        request.source(jsonMap);
        //执行
        client.index(request, RequestOptions.DEFAULT);
    }

    private static void addIndexByJson(RestHighLevelClient client) throws IOException {
        IndexRequest request = new IndexRequest("java_test");
        request.id("10");
        String jsonString = "{" +
                "\"name\":\"jessic\"," +
                "\"age\":20" +
                "}";
        request.source(jsonString, XContentType.JSON);
        //执行
        client.index(request, RequestOptions.DEFAULT);
    }
}