ES提供了多种编程语言的链接方式,有Java API,PHP API,.NET API

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.9/java-rest-high-supported-apis.html

下面阐述ES支持的客户端链接方式:

1:REST API

可以通过浏览器请求get方法进行链接;

利用Postman等工具发起REST请求;

Java发起HttpClient请求;

2:Transport链接

通过socket链接,用官网一个的TransPort客户端,底层是netty

特别提示:

ES在7.0版本开始将废弃TransportClient,8.0版本开始将完全移除TransportClient

取而代之的是High Level REST Client。
Java High Level REST Client 为高级别的Rest客户端,基于低级别的REST客户端,增加了编组请求JSON串,解析响应JSON串等相关API,使用的版本需要和ES服务端的版本保持一致,否则会有版本问题。

首先在使用Java REST Client的时候引入maven的Jar包依赖:

            <!--elasticsearch 高级操作 依赖-->
         <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.9.1</version>
        </dependency>

直接上代码操作

package com.example.jsoupdome.elastic;

import com.example.jsoupdome.common.utils.JsonUtil;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.client.Cancellable;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author zhang
 * @version 1.0
 * @date 2021/10/27 15:11
 */
@Configuration
@Slf4j
public class Client {
    @Value("${es.url}")
    private String esUrl;
    private  RestHighLevelClient client;
    //测试使用构造
    public  Client(){
        client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("192.168.126.128", 9200, "http")));

    }
//    @Bean
    RestHighLevelClient configRestHighLevelClient() throws Exception {

        String[] esUrlArr = esUrl.split(",");

        List<HttpHost> httpHosts = new ArrayList<>();
        for(String es : esUrlArr){
            String[] esUrlPort = es.split(":");
            httpHosts.add(new HttpHost(esUrlPort[0], Integer.parseInt(esUrlPort[1]), "http"));
        }
        client = new RestHighLevelClient(
                RestClient.builder(httpHosts.toArray(new HttpHost[0]))
        );
        CreateIndexRequest request = new CreateIndexRequest("");//创建索引
        ActionListener<CreateIndexResponse> listener = new ActionListener<CreateIndexResponse>() {
            @Override
            public void onResponse(CreateIndexResponse createIndexResponse) {
                //如果执行成功,则调用onResponse方法;

            }
            @Override
            public void onFailure(Exception e) {
                //如果失败,则调用onFailure方法。
                log.error("如果失败,则调用onFailure方法。"+e);
            }
        };
        client.indices().createAsync(request,null,listener);//要执行的CreateIndexRequest和执行完成时要使用的ActionListener
        return client;
    }


    /**
     *  新增,修改文档
     * @param indexName  索引
     * @param type mapping type
     * @param id 文档id
     * @param jsonStr 文档数据
     */
    public   void addData(String indexName,String type ,String id,String jsonStr) {
        try {
            // 1、创建索引请求  //索引  // mapping type  //文档id
            IndexRequest request = new IndexRequest(indexName, type, id);     //文档id
            // 2、准备文档数据
            // 直接给JSON串
            request.source(jsonStr, XContentType.JSON);

            //4、发送请求
            IndexResponse indexResponse = null;
            try {
                // 同步方式
                indexResponse = client.index(request,RequestOptions.DEFAULT);
            } catch (ElasticsearchException e) {
                // 捕获,并处理异常
                //判断是否版本冲突、create但文档已存在冲突
                if (e.status() == RestStatus.CONFLICT) {
                    System.out.println("冲突了,请在此写冲突处理逻辑!" + e.getDetailedMessage());
                }
            }
            //5、处理响应
            if (indexResponse != null) {
                String index1 = indexResponse.getIndex();
                String type1 = indexResponse.getType();
                String id1 = indexResponse.getId();
                long version1 = indexResponse.getVersion();
                if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
                    System.out.println("新增文档成功!" + index1 + type1 + id1 + version1);
                } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
                    System.out.println("修改文档成功!");
                }
                // 分片处理信息
                ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
                if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
                    System.out.println("分片处理信息.....");
                }
                // 如果有分片副本失败,可以获得失败原因信息
                if (shardInfo.getFailed() > 0) {
                    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
                        String reason = failure.reason();
                        System.out.println("副本失败原因:" + reason);
                    }
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     *  获取文档
     * @param indexName  索引
     * @param fields 为特定字段配置源排除
     * @param id 文档id
     */
    public   void getData(String indexName,String fields ,String id) {
        // 1、通过索引和文档id
        GetRequest request = new GetRequest(indexName, id);
        //2.对特定的稳当检索
        request.storedFields(fields);
        //4.在检索文档之前执行刷新(false默认情况下)
        request.refresh(true);
        /**
         * 异步方法不阻塞并立即返回。一旦完成ActionListener使用 响应方法如果执行成功完成或使用onFailure方法ifit失败。
         * 失败场景和预期异常与同步执行情况相同。
         * */
        new Client().client.getAsync(request, RequestOptions.DEFAULT, new ActionListener<GetResponse>() {
             @Override
             public void onResponse(GetResponse getResponse) {
                 if (getResponse.isExists()) {
                     Map map = JsonUtil.fromJson(getResponse.toString(), Map.class);
                     log.info("111111111111");
                     log.info(map.toString());
                 } else {
                     log.error("找不到文档");
                 }
             }
             @Override
             public void onFailure(Exception e) {
                //如果失败,则调用onFailure方法。
                 log.error("执行失败"+e);
             }
         });
    }
    /**
     *  获取文档
     * @param indexName  索引
     * @param id 文档id
     */
    public  void deleteDate(String indexName,String id) {
        // 1、通过索引和文档id
        DeleteRequest request = new DeleteRequest(
                indexName,
                id);
        /**
         * 异步方法不阻塞并立即返回。一旦完成ActionListener使用 响应方法如果执行成功完成或使用onFailure方法ifit失败。
         * 失败场景和预期异常与同步执行情况相同。
         * */
     client.deleteAsync(request, RequestOptions.DEFAULT, new ActionListener<DeleteResponse>() {
         @Override
         public void onResponse(DeleteResponse deleteResponse) {
            log.info("删除成功");
         }
         @Override
         public void onFailure(Exception e) {
             log.info("删除失败");
         }
     });

    }




    public static void main(String[] args) {
        Client client = new Client();
        //1.通过id获取指定文档
            client.getData("content",null,"162");

        //2.删除指定索引下的id
        client.deleteDate("content","162");
    }
}


```bash


***

## 解析json

***

```bash
public class JsonUtil {
    private static final Logger logger = LoggerFactory.getLogger(JsonUtil.class);
    private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    /**
     * 对象映射
     */
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    static {
        OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        OBJECT_MAPPER.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
        OBJECT_MAPPER.setDateFormat(new SimpleDateFormat(DATE_FORMAT));
        OBJECT_MAPPER.setSerializationInclusion(Include.NON_NULL);
    }


    /**
     * Java对象转换为Json串
     *
     * @param obj Java对象
     * @return Json串
     */
    public static String toJson(Object obj) {
        String rst;
        if (obj == null ||  obj instanceof String) {
            return (String) obj;
        }
        try {
            rst = OBJECT_MAPPER.writeValueAsString(obj);
        } catch (Exception e) {
            logger.error("将Java对象转换成Json串出错!");
            throw new RuntimeException("将Java对象转换成Json串出错!", e);
        }
        return rst;
    }

    /**
     * Json串转换为Java对象
     *
     * @param json Json串
     * @param type Java对象类型
     * @return Java对象
     */
    public static <T> T fromJson(String json, Class<T> type) {
        T rst;
        try {
            rst = OBJECT_MAPPER.readValue(json, type);
        } catch (Exception e) {
            logger.error("Json串转换成对象出错:{}", json);
            throw new RuntimeException("Json串转换成对象出错!", e);
        }
        return rst;
    }

    /**
     * Json串转换为Java对象
     * <br>使用引用类型,适用于List&ltObject&gt、Set&ltObject&gt 这种无法直接获取class对象的场景
     * <br>使用方法:TypeReference ref = new TypeReference&ltList&ltInteger&gt&gt(){};
     *
     * @param json    Json串
     * @param typeRef Java对象类型引用
     * @return Java对象
     */
    @SuppressWarnings("unchecked")
    public static <T> T fromJson(String json, TypeReference<T> typeRef) {
        T rst;
        try {
            rst = OBJECT_MAPPER.readValue(json, typeRef);
        } catch (Exception e) {
            logger.error("Json串转换成对象出错:{}", json);
            throw new RuntimeException("Json串转换成对象出错!", e);
        }
        return rst;
    }

    @SuppressWarnings("unchecked")
    public static HashMap<String, Object> fromJsonToMap(String json) {
        HashMap<String, Object> map = new HashMap<String, Object>();
        try {
            map = OBJECT_MAPPER.readValue(json, map.getClass());
        } catch (IOException e) {
            logger.error("Json串转换成对象出错:{}", json);
        }
        return map;
    }

    @SuppressWarnings("unchecked")
    public static HashMap<String, Object> toMap(String json) {
        HashMap<String, Object> map;
        try {
            map = OBJECT_MAPPER.readValue(json, HashMap.class);
        } catch (Exception e) {
            map = null;
            logger.error("Json串转换成对象出错:{}", json);
        }
        return map;
    }



    public static void main(String[] args) {
        HashMap<String, Object> map = new HashMap<>();
        map.put("id",1111);
        map.put("name","张三");
        String s = JsonUtil.toJson(map);
        Map map1 = JsonUtil.fromJson(s, Map.class);
        System.out.println(map1);
    }
}

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐