在web网站的架设中特别是数据量大的网站或者APP小程序需要搜索或者全文检索的场景,几乎都需要借助ElasticSearch来作为全文检索引擎,以提高网站的搜索效率和性能。

这一节,我们通过一篇文章介绍,使大家通过一文就学会使用ElasticSearch。

一、ElasticSearch介绍:

ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。

ElasticSearch相关概念:

a)、索引index,相当于数据库中的database。
b)、类型type相当于数据库中的table。
c)、主键id相当于数据库中记录的主键,是唯一的。
d)、文档 document (相当于一条数据)
文档是ElasticSearch的基本单位。在Es中文档以JSON格式来表示
向es中的index下面的type中存储json类型的数据。
e) 、字段是文档中的field 属性,需要对每一个属性定义索引和被搜索的方式

二、ElasticSearch的安装:

1、先安装jdk

2、安装ElasticSearch

直接进入elasticsearch的官网,下载最新的安装包:https://www.elastic.co/downloads/elasticsearch,此教程使用的是5.1.1版本。

将下载的安装包上传到centos,或者直接在centos使用wget命令下载。

解压:

unzip elasticsearch-5.1.1.zip

运行:

cd bin./elasticsearch

三、java语言操作ElasticSearch:

1、maven依赖

                    org.elasticsearch            elasticsearch            7.6.2                             org.elasticsearch.client            elasticsearch-rest-high-level-client            7.6.2         

2、连接ElasticSearch

import org.apache.http.HttpHost;import org.elasticsearch.client.RestClient;import org.elasticsearch.client.RestHighLevelClient; import java.io.IOException; public class EsClientTest {     public static void main(String[] args) throws IOException {        RestHighLevelClient esClient = new RestHighLevelClient(                RestClient.builder(new HttpHost("IP",9200,"http"))        );        System.out.println("success");        esClient.close();    } }

3、连接的相关api

    public static RestHighLevelClient esClient;     static {        esClient = new RestHighLevelClient(                RestClient.builder(new HttpHost("IP", 9200, "http"))        );    }

4、创建索引操作:

    /**     * 创建索引     * @throws IOException     */    public static void createIndex() throws IOException {        CreateIndexRequest createIndexRequest = new CreateIndexRequest("user");        CreateIndexResponse indexResponse = esClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);        boolean acknowledged = indexResponse.isAcknowledged();        System.out.println("索引创建状态:" + acknowledged);    }

5、获取索引:

/**     * 索引信息查询     * @throws IOException     */    public static void getIndex() throws IOException {        GetIndexRequest getIndexRequest = new GetIndexRequest("user");        GetIndexResponse getIndexResponse = esClient.indices().get(getIndexRequest, RequestOptions.DEFAULT);        System.out.println(getIndexResponse.getAliases());        System.out.println(getIndexResponse.getMappings());        System.out.println(getIndexResponse.getSettings());    }

6、删除索引:

    /**     * 删除索引     * @throws IOException     */    public static void deleteIndex() throws IOException {        DeleteIndexRequest getIndexRequest = new DeleteIndexRequest("user");        AcknowledgedResponse delete = esClient.indices().delete(getIndexRequest, RequestOptions.DEFAULT);        System.out.println("索引删除状态:" + delete.isAcknowledged());    }

7、添加数据:

    /**     * 添加数据     * @throws Exception     */    public static void add() throws Exception{        IndexRequest indexRequest = new IndexRequest();        indexRequest.index("user").id("1008");        User user = new User();        user.setName("茅河野人");        user.setAge(28);        user.setSex("男");        user.setSalary(50000);         String userData = objectMapper.writeValueAsString(user);        indexRequest.source(userData,XContentType.JSON);        //插入数据        IndexResponse response = esClient.index(indexRequest, RequestOptions.DEFAULT);        System.out.println(response.status());        System.out.println(response.getResult());    }

8、修改数据:

    /**     * 修改数据     * @throws Exception     */    public static void update() throws Exception{        UpdateRequest request = new UpdateRequest();        request.index("user").id("1008");        request.doc(XContentType.JSON,"name","茅河野人");        //插入数据        UpdateResponse response = esClient.update(request, RequestOptions.DEFAULT);        System.out.println(response.getResult());    }

9、删除数据:

    /**     * 删除     * @throws Exception     */    public static void delete() throws Exception{        DeleteRequest request = new DeleteRequest();        request.index("user").id("1008");        //插入数据        DeleteResponse delete = esClient.delete(request, RequestOptions.DEFAULT);        System.out.println(delete.getResult());    }

10、批量添加数据:

    /**     * 批量添加     * @throws Exception     */    public static void batchInsert() throws Exception{         BulkRequest bulkRequest = new BulkRequest();         User user1 = new User("关羽","男",33,5500);        String userData1 = objectMapper.writeValueAsString(user1);        IndexRequest indexRequest1 = new IndexRequest().index("user").id("1002").source(userData1, XContentType.JSON);         bulkRequest.add(indexRequest1);         User user2 = new User("黄忠","男",50,8000);        String userData2 = objectMapper.writeValueAsString(user2);        IndexRequest indexRequest2 = new IndexRequest().index("user").id("1003").source(userData2, XContentType.JSON);        bulkRequest.add(indexRequest2);         User user3 = new User("黄忠2","男",49,10000);        String userData3 = objectMapper.writeValueAsString(user3);        IndexRequest indexRequest3 = new IndexRequest().index("user").id("1004").source(userData3, XContentType.JSON);        bulkRequest.add(indexRequest3);         User user4 = new User("赵云","男",33,12000);        String userData4 = objectMapper.writeValueAsString(user4);        IndexRequest indexRequest4 = new IndexRequest().index("user").id("1005").source(userData4, XContentType.JSON);        bulkRequest.add(indexRequest4);         User user5 = new User("马超","男",38,20000);        String userData5 = objectMapper.writeValueAsString(user5);        IndexRequest indexRequest5 = new IndexRequest().index("user").id("1006").source(userData5, XContentType.JSON);        bulkRequest.add(indexRequest5);         User user6 = new User("关羽","男",41,27000);        String userData6 = objectMapper.writeValueAsString(user6);        IndexRequest indexRequest6 = new IndexRequest().index("user").id("1007").source(userData6, XContentType.JSON);        bulkRequest.add(indexRequest6);         BulkResponse bulkResponse = esClient.bulk(bulkRequest, RequestOptions.DEFAULT);        System.out.println(bulkResponse.status());        System.out.println(bulkResponse.getItems());    }

11、批量删除数据:

    /**     * 批量删除     * @throws Exception     */    public static void batchDelete() throws Exception{        BulkRequest bulkRequest = new BulkRequest();        DeleteRequest indexRequest1 = new DeleteRequest().index("user").id("1002");        DeleteRequest indexRequest2 = new DeleteRequest().index("user").id("1003");        DeleteRequest indexRequest3 = new DeleteRequest().index("user").id("1004");        DeleteRequest indexRequest4 = new DeleteRequest().index("user").id("1005");        DeleteRequest indexRequest5 = new DeleteRequest().index("user").id("1006");        DeleteRequest indexRequest6 = new DeleteRequest().index("user").id("1007");         bulkRequest.add(indexRequest1);        bulkRequest.add(indexRequest2);        bulkRequest.add(indexRequest3);        bulkRequest.add(indexRequest4);        bulkRequest.add(indexRequest5);        bulkRequest.add(indexRequest6);         BulkResponse bulkResponse = esClient.bulk(bulkRequest, RequestOptions.DEFAULT);        System.out.println(bulkResponse.status());        System.out.println(bulkResponse.getItems());    }

13、删除某个索引下所有数据:

    /**     * 查询某个索引下的所有数据     * @throws Exception     */    public static void searchIndexAll() throws Exception{        SearchRequest request = new SearchRequest();        request.indices("user");        // 索引中的全部数据查询        SearchSourceBuilder query = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery());        request.source(query);        SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);        SearchHits hits = response.getHits();        for (SearchHit searchHit : hits){            System.out.println(searchHit.getSourceAsString());        }    }

14、根据条件查询:

        TermQueryBuilder ageQueryBuilder = QueryBuilders.termQuery("sex", "女");        SearchSourceBuilder query = new SearchSourceBuilder().query(ageQueryBuilder);        request.source(query);        SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);        System.out.println(response.getHits().getHits());        System.out.println(response.getHits().getTotalHits());        SearchHits hits = response.getHits();        for (SearchHit searchHit : hits){            System.out.println(searchHit.getSourceAsString());        }

15、分页查询:

        SearchSourceBuilder sourceBuilder = new                                 SearchSourceBuilder().query(QueryBuilders.matchAllQuery());        sourceBuilder.from(0).size(3);        request.source(sourceBuilder);        SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);        System.out.println(response.getHits().getHits());        System.out.println(response.getHits().getTotalHits());        SearchHits hits = response.getHits();        for (SearchHit searchHit : hits){            System.out.println(searchHit.getSourceAsString());        }

四、在springboot中的运用

1、maven依赖

                 org.springframework.boot            spring-boot-starter-data-elasticsearch       

2、yml配置文件:

# es 服务地址elasticsearch.host=IP# es 服务端口elasticsearch.port=9200# 配置日志级别,开启 debug 日志logging.level.com.congge=debug

3、实际例子:

创建一个实体类

import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import lombok.ToString;import org.springframework.data.annotation.Id;import org.springframework.data.elasticsearch.annotations.Document;import org.springframework.data.elasticsearch.annotations.Field;import org.springframework.data.elasticsearch.annotations.FieldType; @Data@NoArgsConstructor@AllArgsConstructor@ToString@Document(indexName = "shopping", shards = 3, replicas = 1)public class Product {    //必须有 id,这里的 id 是全局唯一的标识,等同于 es 中的"_id"    @Id    private Long id;//商品唯一标识     /**     * type : 字段数据类型     * analyzer : 分词器类型     * index : 是否索引(默认:true)     * Keyword : 短语,不进行分词     */    @Field(type = FieldType.Text, analyzer = "ik_max_word")    private String title;//商品名称     @Field(type = FieldType.Keyword)    private String category;//分类名称     @Field(type = FieldType.Double)    private Double price;//商品价格     @Field(type = FieldType.Keyword, index = false)    private String images;//图片地址}

提供接口:

import com.congge.entity.Product;import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;import org.springframework.stereotype.Repository; @Repositorypublic interface ProductDao extends ElasticsearchRepository{ }

配置类:

import lombok.Data;import org.apache.http.HttpHost;import org.elasticsearch.client.RestClient;import org.elasticsearch.client.RestClientBuilder;import org.elasticsearch.client.RestHighLevelClient;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.annotation.Configuration;//import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration; @ConfigurationProperties(prefix = "elasticsearch")@Configuration@Datapublic class EsConfig extends com.congge.config.AbstractElasticsearchConfiguration {     private String host ;    private Integer port ;     //重写父类方法    @Override    public RestHighLevelClient elasticsearchClient() {        RestClientBuilder builder = RestClient.builder(new HttpHost(host, port));        RestHighLevelClient restHighLevelClient = new                RestHighLevelClient(builder);        return restHighLevelClient;    }}
import org.elasticsearch.client.RestHighLevelClient;import org.springframework.context.annotation.Bean;import org.springframework.data.elasticsearch.config.ElasticsearchConfigurationSupport;import org.springframework.data.elasticsearch.core.ElasticsearchOperations;import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; public abstract class AbstractElasticsearchConfiguration extends ElasticsearchConfigurationSupport {     //需重写本方法    public abstract RestHighLevelClient elasticsearchClient();     @Bean(name = { "elasticsearchOperations", "elasticsearchTemplate" })    public ElasticsearchOperations elasticsearchOperations(ElasticsearchConverter elasticsearchConverter) {        return new ElasticsearchRestTemplate(elasticsearchClient(), elasticsearchConverter);    }}

测试1:

import com.congge.entity.Product;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class)@SpringBootTestpublic class EsIndexTest {     //注入 ElasticsearchRestTemplate    @Autowired    private ElasticsearchRestTemplate elasticsearchRestTemplate;     //创建索引并增加映射配置    @Test    public void createIndex(){        //创建索引,系统初始化会自动创建索引        System.out.println("创建索引");    }     @Test    public void deleteIndex(){        //创建索引,系统初始化会自动创建索引        boolean flg = elasticsearchRestTemplate.deleteIndex(Product.class);        System.out.println("删除索引 = " + flg);    } }

测试2:

import com.congge.dao.ProductDao;import com.congge.entity.Product;import org.elasticsearch.index.query.QueryBuilders;import org.elasticsearch.index.query.TermQueryBuilder;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.data.domain.Page;import org.springframework.data.domain.PageRequest;import org.springframework.data.domain.Sort;import org.springframework.test.context.junit4.SpringRunner; import java.util.ArrayList;import java.util.List; @RunWith(SpringRunner.class)@SpringBootTestpublic class EsDocTest {      @Autowired    private ProductDao productDao;     /**     * 新增     */    @Test    public void save() {        Product product = new Product();        product.setId(2L);        product.setTitle("ipad mini");        product.setCategory("ipad");        product.setPrice(1998.0);        product.setImages("http://ipad.jpg");        productDao.save(product);    }      //修改    @Test    public void update(){        Product product = new Product();        product.setId(2L);        product.setTitle("iphone");        product.setCategory("mobile");        product.setPrice(6999.0);        product.setImages("http://www.phone.jpg");        productDao.save(product);    }     //根据 id 查询    @Test    public void findById(){        Product product = productDao.findById(2L).get();        System.out.println(product);    }     //查询所有    @Test    public void findAll(){        Iterable products = productDao.findAll();        for (Product product : products) {            System.out.println(product);        }    }     //删除    @Test    public void delete(){        Product product = new Product();        product.setId(2L);        productDao.delete(product);    }     //批量新增    @Test    public void saveAll(){        List productList = new ArrayList();        for (int i = 0; i < 10; i++) {            Product product = new Product();            product.setId(Long.valueOf(i));            product.setTitle("iphone" + i);            product.setCategory("mobile");            product.setPrice(5999.0 + i);            product.setImages("http://www.phone.jpg");            productList.add(product);        }        productDao.saveAll(productList);    }     //分页查询    @Test    public void findByPageable(){        //设置排序(排序方式,正序还是倒序,排序的 id)        Sort sort = Sort.by(Sort.Direction.DESC,"id");        int currentPage=0;//当前页,第一页从 0 开始, 1 表示第二页        int pageSize = 5;//每页显示多少条        //设置查询分页        PageRequest pageRequest = PageRequest.of(currentPage, pageSize,sort);        //分页查询        Page productPage = productDao.findAll(pageRequest);        for (Product Product : productPage.getContent()) {            System.out.println(Product);        }    }     /**     * term 查询     * search(termQueryBuilder) 调用搜索方法,参数查询构建器对象     */    @Test    public void termQuery(){        TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "iphone");        Iterable products = productDao.search(termQueryBuilder);        for (Product product : products) {            System.out.println(product);        }    }     /**     * term 查询加分页     */    @Test    public void termQueryByPage(){        int currentPage= 0 ;        int pageSize = 5;        //设置查询分页        PageRequest pageRequest = PageRequest.of(currentPage, pageSize);        TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "phone");        Iterable products =                productDao.search(termQueryBuilder,pageRequest);        for (Product product : products) {            System.out.println(product);        }    } }

五、将mysql数据写入Elasticsearch例子

package com.example.esdemo.service.impl;import com.example.esdemo.config.DBHelper;import com.example.esdemo.imports.ImportDb2Es;import com.example.esdemo.service.ImportService;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import org.elasticsearch.action.ActionListener;import org.elasticsearch.action.bulk.BackoffPolicy;import org.elasticsearch.action.bulk.BulkProcessor;import org.elasticsearch.action.bulk.BulkRequest;import org.elasticsearch.action.bulk.BulkResponse;import org.elasticsearch.action.index.IndexRequest;import org.elasticsearch.client.RequestOptions;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.common.unit.ByteSizeUnit;import org.elasticsearch.common.unit.ByteSizeValue;import org.elasticsearch.common.unit.TimeValue;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.sql.*;import java.util.ArrayList;import java.util.HashMap;import java.util.concurrent.TimeUnit;import java.util.function.BiConsumer;/** * 导入db2es 实现类 */@Componentpublic class ImportServiceImpl implements ImportService {    private static final Logger logger = LogManager.getLogger(ImportServiceImpl.class);    @Autowired    private RestHighLevelClient client;    @Override    public void importDb2Es(ImportDb2Es importDb2Es) {        writeMySQLDataToES(importDb2Es.getDbTableName(),importDb2Es.getDbTableName());    }    private void writeMySQLDataToES(String tableName,String esIndeName) {        BulkProcessor bulkProcessor = getBulkProcessor(client);        Connection connection = null;        PreparedStatement ps = null;        ResultSet rs = null;        try {            connection = DBHelper.getConn();            logger.info("start handle data :" + tableName);            String sql = "select * from " + tableName;            ps = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);            // 根据自己需要设置 fetchSize            ps.setFetchSize(20);            rs = ps.executeQuery();            ResultSetMetaData colData = rs.getMetaData();            ArrayList<HashMap> dataList = new ArrayList();            HashMap map = null;            int count = 0;            // c 就是列的名字   v 就是列对应的值            String c = null;            String v = null;            while (rs.next()) {                count++;                map = new HashMap(128);                for (int i = 1; i < colData.getColumnCount(); i++) {                    c = colData.getColumnName(i);                    v = rs.getString(c);                    map.put(c, v);                }                dataList.add(map);                // 每1万条 写一次   不足的批次的数据 最后一次提交处理                if (count % 10000 == 0) {                    logger.info("mysql handle data  number:" + count);                    // 将数据添加到 bulkProcessor                    for (HashMap hashMap2 : dataList) {                        bulkProcessor.add(new IndexRequest(esIndeName).source(hashMap2));                    }                    // 每提交一次 清空 map 和  dataList                    map.clear();                    dataList.clear();                }            }            // 处理 未提交的数据            for (HashMap hashMap2 : dataList) {                bulkProcessor.add(new IndexRequest(esIndeName).source(hashMap2));            }            bulkProcessor.flush();        } catch (SQLException e) {            e.printStackTrace();        } finally {            try {                rs.close();                ps.close();                connection.close();                boolean terinaFlag = bulkProcessor.awaitClose(150L, TimeUnit.SECONDS);                logger.info(terinaFlag);            } catch (Exception e) {                e.printStackTrace();            }        }    }    private BulkProcessor getBulkProcessor(RestHighLevelClient client) {        BulkProcessor bulkProcessor = null;        try {            BulkProcessor.Listener listener = new BulkProcessor.Listener() {                @Override                public void beforeBulk(long executionId, BulkRequest request) {                    logger.info("Try to insert data number : "                            + request.numberOfActions());                }                @Override                public void afterBulk(long executionId, BulkRequest request,                                      BulkResponse response) {                    logger.info("************** Success insert data number : "                            + request.numberOfActions() + " , id: " + executionId);                }                @Override                public void afterBulk(long executionId, BulkRequest request, Throwable failure) {                    logger.error("Bulk is unsuccess : " + failure + ", executionId: " + executionId);                }            };            BiConsumer<BulkRequest, ActionListener> bulkConsumer = (request, bulkListener) -> client                    .bulkAsync(request, RequestOptions.DEFAULT, bulkListener);            BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener);            builder.setBulkActions(5000);            builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB));            builder.setConcurrentRequests(10);            builder.setFlushInterval(TimeValue.timeValueSeconds(100L));            builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));            // 注意点:让参数设置生效            bulkProcessor = builder.build();        } catch (Exception e) {            e.printStackTrace();            try {                bulkProcessor.awaitClose(100L, TimeUnit.SECONDS);            } catch (Exception e1) {                logger.error(e1.getMessage());            }        }        return bulkProcessor;    }}