在web网站的架设中特别是数据量大的网站或者APP小程序需要搜索或者全文检索的场景,几乎都需要借助ElasticSearch来作为全文检索引擎,以提高网站的搜索效率和性能。
这一节,我们通过一篇文章介绍,使大家通过一文就学会使用ElasticSearch。
一、ElasticSearch介绍:
ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。
ElasticSearch相关概念:
a)、索引index,相当于数据库中的database。
b)、类型type相当于数据库中的table。
c)、主键id相当于数据库中记录的主键,是唯一的。
d)、文档 document (相当于一条数据)
文档是ElasticSearch的基本单位。在Es中文档以JSON格式来表示
向es中的index下面的type中存储json类型的数据。
e) 、字段是文档中的field 属性,需要对每一个属性定义索引和被搜索的方式
二、ElasticSearch的安装:
1、先安装jdk
2、安装ElasticSearch
直接进入elasticsearch的官网,下载最新的安装包:https://www.elastic.co/downloads/elasticsearch,此教程使用的是5.1.1版本。
将下载的安装包上传到centos,或者直接在centos使用wget命令下载。
解压:
unzip elasticsearch-5.1.1.zip
运行:
cd bin./elasticsearch
三、java语言操作ElasticSearch:
1、maven依赖
org.elasticsearch elasticsearch 7.6.2 org.elasticsearch.client elasticsearch-rest-high-level-client 7.6.2
2、连接ElasticSearch
import org.apache.http.HttpHost;import org.elasticsearch.client.RestClient;import org.elasticsearch.client.RestHighLevelClient; import java.io.IOException; public class EsClientTest { public static void main(String[] args) throws IOException { RestHighLevelClient esClient = new RestHighLevelClient( RestClient.builder(new HttpHost("IP",9200,"http")) ); System.out.println("success"); esClient.close(); } }
3、连接的相关api
public static RestHighLevelClient esClient; static { esClient = new RestHighLevelClient( RestClient.builder(new HttpHost("IP", 9200, "http")) ); }
4、创建索引操作:
/** * 创建索引 * @throws IOException */ public static void createIndex() throws IOException { CreateIndexRequest createIndexRequest = new CreateIndexRequest("user"); CreateIndexResponse indexResponse = esClient.indices().create(createIndexRequest, RequestOptions.DEFAULT); boolean acknowledged = indexResponse.isAcknowledged(); System.out.println("索引创建状态:" + acknowledged); }
5、获取索引:
/** * 索引信息查询 * @throws IOException */ public static void getIndex() throws IOException { GetIndexRequest getIndexRequest = new GetIndexRequest("user"); GetIndexResponse getIndexResponse = esClient.indices().get(getIndexRequest, RequestOptions.DEFAULT); System.out.println(getIndexResponse.getAliases()); System.out.println(getIndexResponse.getMappings()); System.out.println(getIndexResponse.getSettings()); }
6、删除索引:
/** * 删除索引 * @throws IOException */ public static void deleteIndex() throws IOException { DeleteIndexRequest getIndexRequest = new DeleteIndexRequest("user"); AcknowledgedResponse delete = esClient.indices().delete(getIndexRequest, RequestOptions.DEFAULT); System.out.println("索引删除状态:" + delete.isAcknowledged()); }
7、添加数据:
/** * 添加数据 * @throws Exception */ public static void add() throws Exception{ IndexRequest indexRequest = new IndexRequest(); indexRequest.index("user").id("1008"); User user = new User(); user.setName("茅河野人"); user.setAge(28); user.setSex("男"); user.setSalary(50000); String userData = objectMapper.writeValueAsString(user); indexRequest.source(userData,XContentType.JSON); //插入数据 IndexResponse response = esClient.index(indexRequest, RequestOptions.DEFAULT); System.out.println(response.status()); System.out.println(response.getResult()); }
8、修改数据:
/** * 修改数据 * @throws Exception */ public static void update() throws Exception{ UpdateRequest request = new UpdateRequest(); request.index("user").id("1008"); request.doc(XContentType.JSON,"name","茅河野人"); //插入数据 UpdateResponse response = esClient.update(request, RequestOptions.DEFAULT); System.out.println(response.getResult()); }
9、删除数据:
/** * 删除 * @throws Exception */ public static void delete() throws Exception{ DeleteRequest request = new DeleteRequest(); request.index("user").id("1008"); //插入数据 DeleteResponse delete = esClient.delete(request, RequestOptions.DEFAULT); System.out.println(delete.getResult()); }
10、批量添加数据:
/** * 批量添加 * @throws Exception */ public static void batchInsert() throws Exception{ BulkRequest bulkRequest = new BulkRequest(); User user1 = new User("关羽","男",33,5500); String userData1 = objectMapper.writeValueAsString(user1); IndexRequest indexRequest1 = new IndexRequest().index("user").id("1002").source(userData1, XContentType.JSON); bulkRequest.add(indexRequest1); User user2 = new User("黄忠","男",50,8000); String userData2 = objectMapper.writeValueAsString(user2); IndexRequest indexRequest2 = new IndexRequest().index("user").id("1003").source(userData2, XContentType.JSON); bulkRequest.add(indexRequest2); User user3 = new User("黄忠2","男",49,10000); String userData3 = objectMapper.writeValueAsString(user3); IndexRequest indexRequest3 = new IndexRequest().index("user").id("1004").source(userData3, XContentType.JSON); bulkRequest.add(indexRequest3); User user4 = new User("赵云","男",33,12000); String userData4 = objectMapper.writeValueAsString(user4); IndexRequest indexRequest4 = new IndexRequest().index("user").id("1005").source(userData4, XContentType.JSON); bulkRequest.add(indexRequest4); User user5 = new User("马超","男",38,20000); String userData5 = objectMapper.writeValueAsString(user5); IndexRequest indexRequest5 = new IndexRequest().index("user").id("1006").source(userData5, XContentType.JSON); bulkRequest.add(indexRequest5); User user6 = new User("关羽","男",41,27000); String userData6 = objectMapper.writeValueAsString(user6); IndexRequest indexRequest6 = new IndexRequest().index("user").id("1007").source(userData6, XContentType.JSON); bulkRequest.add(indexRequest6); BulkResponse bulkResponse = esClient.bulk(bulkRequest, RequestOptions.DEFAULT); System.out.println(bulkResponse.status()); System.out.println(bulkResponse.getItems()); }
11、批量删除数据:
/** * 批量删除 * @throws Exception */ public static void batchDelete() throws Exception{ BulkRequest bulkRequest = new BulkRequest(); DeleteRequest indexRequest1 = new DeleteRequest().index("user").id("1002"); DeleteRequest indexRequest2 = new DeleteRequest().index("user").id("1003"); DeleteRequest indexRequest3 = new DeleteRequest().index("user").id("1004"); DeleteRequest indexRequest4 = new DeleteRequest().index("user").id("1005"); DeleteRequest indexRequest5 = new DeleteRequest().index("user").id("1006"); DeleteRequest indexRequest6 = new DeleteRequest().index("user").id("1007"); bulkRequest.add(indexRequest1); bulkRequest.add(indexRequest2); bulkRequest.add(indexRequest3); bulkRequest.add(indexRequest4); bulkRequest.add(indexRequest5); bulkRequest.add(indexRequest6); BulkResponse bulkResponse = esClient.bulk(bulkRequest, RequestOptions.DEFAULT); System.out.println(bulkResponse.status()); System.out.println(bulkResponse.getItems()); }
13、删除某个索引下所有数据:
/** * 查询某个索引下的所有数据 * @throws Exception */ public static void searchIndexAll() throws Exception{ SearchRequest request = new SearchRequest(); request.indices("user"); // 索引中的全部数据查询 SearchSourceBuilder query = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()); request.source(query); SearchResponse response = esClient.search(request, RequestOptions.DEFAULT); SearchHits hits = response.getHits(); for (SearchHit searchHit : hits){ System.out.println(searchHit.getSourceAsString()); } }
14、根据条件查询:
TermQueryBuilder ageQueryBuilder = QueryBuilders.termQuery("sex", "女"); SearchSourceBuilder query = new SearchSourceBuilder().query(ageQueryBuilder); request.source(query); SearchResponse response = esClient.search(request, RequestOptions.DEFAULT); System.out.println(response.getHits().getHits()); System.out.println(response.getHits().getTotalHits()); SearchHits hits = response.getHits(); for (SearchHit searchHit : hits){ System.out.println(searchHit.getSourceAsString()); }
15、分页查询:
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()); sourceBuilder.from(0).size(3); request.source(sourceBuilder); SearchResponse response = esClient.search(request, RequestOptions.DEFAULT); System.out.println(response.getHits().getHits()); System.out.println(response.getHits().getTotalHits()); SearchHits hits = response.getHits(); for (SearchHit searchHit : hits){ System.out.println(searchHit.getSourceAsString()); }
四、在springboot中的运用
1、maven依赖
org.springframework.boot spring-boot-starter-data-elasticsearch
2、yml配置文件:
# es 服务地址elasticsearch.host=IP# es 服务端口elasticsearch.port=9200# 配置日志级别,开启 debug 日志logging.level.com.congge=debug
3、实际例子:
创建一个实体类
import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import lombok.ToString;import org.springframework.data.annotation.Id;import org.springframework.data.elasticsearch.annotations.Document;import org.springframework.data.elasticsearch.annotations.Field;import org.springframework.data.elasticsearch.annotations.FieldType; @Data@NoArgsConstructor@AllArgsConstructor@ToString@Document(indexName = "shopping", shards = 3, replicas = 1)public class Product { //必须有 id,这里的 id 是全局唯一的标识,等同于 es 中的"_id" @Id private Long id;//商品唯一标识 /** * type : 字段数据类型 * analyzer : 分词器类型 * index : 是否索引(默认:true) * Keyword : 短语,不进行分词 */ @Field(type = FieldType.Text, analyzer = "ik_max_word") private String title;//商品名称 @Field(type = FieldType.Keyword) private String category;//分类名称 @Field(type = FieldType.Double) private Double price;//商品价格 @Field(type = FieldType.Keyword, index = false) private String images;//图片地址}
提供接口:
import com.congge.entity.Product;import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;import org.springframework.stereotype.Repository; @Repositorypublic interface ProductDao extends ElasticsearchRepository{ }
配置类:
import lombok.Data;import org.apache.http.HttpHost;import org.elasticsearch.client.RestClient;import org.elasticsearch.client.RestClientBuilder;import org.elasticsearch.client.RestHighLevelClient;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.annotation.Configuration;//import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration; @ConfigurationProperties(prefix = "elasticsearch")@Configuration@Datapublic class EsConfig extends com.congge.config.AbstractElasticsearchConfiguration { private String host ; private Integer port ; //重写父类方法 @Override public RestHighLevelClient elasticsearchClient() { RestClientBuilder builder = RestClient.builder(new HttpHost(host, port)); RestHighLevelClient restHighLevelClient = new RestHighLevelClient(builder); return restHighLevelClient; }}
import org.elasticsearch.client.RestHighLevelClient;import org.springframework.context.annotation.Bean;import org.springframework.data.elasticsearch.config.ElasticsearchConfigurationSupport;import org.springframework.data.elasticsearch.core.ElasticsearchOperations;import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; public abstract class AbstractElasticsearchConfiguration extends ElasticsearchConfigurationSupport { //需重写本方法 public abstract RestHighLevelClient elasticsearchClient(); @Bean(name = { "elasticsearchOperations", "elasticsearchTemplate" }) public ElasticsearchOperations elasticsearchOperations(ElasticsearchConverter elasticsearchConverter) { return new ElasticsearchRestTemplate(elasticsearchClient(), elasticsearchConverter); }}
测试1:
import com.congge.entity.Product;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class)@SpringBootTestpublic class EsIndexTest { //注入 ElasticsearchRestTemplate @Autowired private ElasticsearchRestTemplate elasticsearchRestTemplate; //创建索引并增加映射配置 @Test public void createIndex(){ //创建索引,系统初始化会自动创建索引 System.out.println("创建索引"); } @Test public void deleteIndex(){ //创建索引,系统初始化会自动创建索引 boolean flg = elasticsearchRestTemplate.deleteIndex(Product.class); System.out.println("删除索引 = " + flg); } }
测试2:
import com.congge.dao.ProductDao;import com.congge.entity.Product;import org.elasticsearch.index.query.QueryBuilders;import org.elasticsearch.index.query.TermQueryBuilder;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.data.domain.Page;import org.springframework.data.domain.PageRequest;import org.springframework.data.domain.Sort;import org.springframework.test.context.junit4.SpringRunner; import java.util.ArrayList;import java.util.List; @RunWith(SpringRunner.class)@SpringBootTestpublic class EsDocTest { @Autowired private ProductDao productDao; /** * 新增 */ @Test public void save() { Product product = new Product(); product.setId(2L); product.setTitle("ipad mini"); product.setCategory("ipad"); product.setPrice(1998.0); product.setImages("http://ipad.jpg"); productDao.save(product); } //修改 @Test public void update(){ Product product = new Product(); product.setId(2L); product.setTitle("iphone"); product.setCategory("mobile"); product.setPrice(6999.0); product.setImages("http://www.phone.jpg"); productDao.save(product); } //根据 id 查询 @Test public void findById(){ Product product = productDao.findById(2L).get(); System.out.println(product); } //查询所有 @Test public void findAll(){ Iterable products = productDao.findAll(); for (Product product : products) { System.out.println(product); } } //删除 @Test public void delete(){ Product product = new Product(); product.setId(2L); productDao.delete(product); } //批量新增 @Test public void saveAll(){ List productList = new ArrayList(); for (int i = 0; i < 10; i++) { Product product = new Product(); product.setId(Long.valueOf(i)); product.setTitle("iphone" + i); product.setCategory("mobile"); product.setPrice(5999.0 + i); product.setImages("http://www.phone.jpg"); productList.add(product); } productDao.saveAll(productList); } //分页查询 @Test public void findByPageable(){ //设置排序(排序方式,正序还是倒序,排序的 id) Sort sort = Sort.by(Sort.Direction.DESC,"id"); int currentPage=0;//当前页,第一页从 0 开始, 1 表示第二页 int pageSize = 5;//每页显示多少条 //设置查询分页 PageRequest pageRequest = PageRequest.of(currentPage, pageSize,sort); //分页查询 Page productPage = productDao.findAll(pageRequest); for (Product Product : productPage.getContent()) { System.out.println(Product); } } /** * term 查询 * search(termQueryBuilder) 调用搜索方法,参数查询构建器对象 */ @Test public void termQuery(){ TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "iphone"); Iterable products = productDao.search(termQueryBuilder); for (Product product : products) { System.out.println(product); } } /** * term 查询加分页 */ @Test public void termQueryByPage(){ int currentPage= 0 ; int pageSize = 5; //设置查询分页 PageRequest pageRequest = PageRequest.of(currentPage, pageSize); TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "phone"); Iterable products = productDao.search(termQueryBuilder,pageRequest); for (Product product : products) { System.out.println(product); } } }
五、将mysql数据写入Elasticsearch例子
package com.example.esdemo.service.impl;import com.example.esdemo.config.DBHelper;import com.example.esdemo.imports.ImportDb2Es;import com.example.esdemo.service.ImportService;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import org.elasticsearch.action.ActionListener;import org.elasticsearch.action.bulk.BackoffPolicy;import org.elasticsearch.action.bulk.BulkProcessor;import org.elasticsearch.action.bulk.BulkRequest;import org.elasticsearch.action.bulk.BulkResponse;import org.elasticsearch.action.index.IndexRequest;import org.elasticsearch.client.RequestOptions;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.common.unit.ByteSizeUnit;import org.elasticsearch.common.unit.ByteSizeValue;import org.elasticsearch.common.unit.TimeValue;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.sql.*;import java.util.ArrayList;import java.util.HashMap;import java.util.concurrent.TimeUnit;import java.util.function.BiConsumer;/** * 导入db2es 实现类 */@Componentpublic class ImportServiceImpl implements ImportService { private static final Logger logger = LogManager.getLogger(ImportServiceImpl.class); @Autowired private RestHighLevelClient client; @Override public void importDb2Es(ImportDb2Es importDb2Es) { writeMySQLDataToES(importDb2Es.getDbTableName(),importDb2Es.getDbTableName()); } private void writeMySQLDataToES(String tableName,String esIndeName) { BulkProcessor bulkProcessor = getBulkProcessor(client); Connection connection = null; PreparedStatement ps = null; ResultSet rs = null; try { connection = DBHelper.getConn(); logger.info("start handle data :" + tableName); String sql = "select * from " + tableName; ps = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); // 根据自己需要设置 fetchSize ps.setFetchSize(20); rs = ps.executeQuery(); ResultSetMetaData colData = rs.getMetaData(); ArrayList<HashMap> dataList = new ArrayList(); HashMap map = null; int count = 0; // c 就是列的名字 v 就是列对应的值 String c = null; String v = null; while (rs.next()) { count++; map = new HashMap(128); for (int i = 1; i < colData.getColumnCount(); i++) { c = colData.getColumnName(i); v = rs.getString(c); map.put(c, v); } dataList.add(map); // 每1万条 写一次 不足的批次的数据 最后一次提交处理 if (count % 10000 == 0) { logger.info("mysql handle data number:" + count); // 将数据添加到 bulkProcessor for (HashMap hashMap2 : dataList) { bulkProcessor.add(new IndexRequest(esIndeName).source(hashMap2)); } // 每提交一次 清空 map 和 dataList map.clear(); dataList.clear(); } } // 处理 未提交的数据 for (HashMap hashMap2 : dataList) { bulkProcessor.add(new IndexRequest(esIndeName).source(hashMap2)); } bulkProcessor.flush(); } catch (SQLException e) { e.printStackTrace(); } finally { try { rs.close(); ps.close(); connection.close(); boolean terinaFlag = bulkProcessor.awaitClose(150L, TimeUnit.SECONDS); logger.info(terinaFlag); } catch (Exception e) { e.printStackTrace(); } } } private BulkProcessor getBulkProcessor(RestHighLevelClient client) { BulkProcessor bulkProcessor = null; try { BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { logger.info("Try to insert data number : " + request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { logger.info("************** Success insert data number : " + request.numberOfActions() + " , id: " + executionId); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { logger.error("Bulk is unsuccess : " + failure + ", executionId: " + executionId); } }; BiConsumer<BulkRequest, ActionListener> bulkConsumer = (request, bulkListener) -> client .bulkAsync(request, RequestOptions.DEFAULT, bulkListener); BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener); builder.setBulkActions(5000); builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB)); builder.setConcurrentRequests(10); builder.setFlushInterval(TimeValue.timeValueSeconds(100L)); builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); // 注意点:让参数设置生效 bulkProcessor = builder.build(); } catch (Exception e) { e.printStackTrace(); try { bulkProcessor.awaitClose(100L, TimeUnit.SECONDS); } catch (Exception e1) { logger.error(e1.getMessage()); } } return bulkProcessor; }}