前言
ElasticSearch Java API是ES官方在8.x版本推出的新java api,也可以适用于7.17.x版本的es。
本文主要参考了相关博文,自己手动编写了下相关操作代码,包括更新mappings等操作的java代码。
代码示例已上传github。
版本
elasticsearch
版本:7.17.9
,修改/elasticsearch-7.17.9/config/elasticsearch.yml
,新增一行配置:xpack.security.enabled: false
,避免提示cerebro
版本:0.8.5
(浏览器连接es工具)jdk
版本:11
elasticsearch-java
版本:
co.elastic.clients elasticsearch-java 7.17.9 com.fasterxml.jackson.core jackson-databind 2.12.3
连接
public static ElasticsearchClient getEsClient(String serverUrl) throws IOException { RestClient restClient = RestClient.builder(HttpHost.create(serverUrl)).build(); ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper()); ElasticsearchClient esClient = new ElasticsearchClient(transport); log.info("{}", esClient.info()); return esClient; }
索引创建索引
public static void createIndex(ElasticsearchClient esClient, String indexName) throws IOException { if (existsIndex(esClient, indexName)) { log.info("index name: {} exists!", indexName); } else { CreateIndexResponse response = esClient.indices().create(c -> c.index(indexName)); log.info("create index name: {}, ack: {}", indexName, response.acknowledged()); } }// 判断索引是否存在public static boolean existsIndex(ElasticsearchClient esClient, String indexName) throws IOException { BooleanResponse exists = esClient.indices().exists(c -> c.index(indexName)); return exists.value(); }
查询索引
public static void getIndex(ElasticsearchClient esClient, String indexName) throws IOException { GetIndexResponse getIndexResponse = esClient.indices().get(s -> s.index(indexName)); Map result = getIndexResponse.result(); result.forEach((k, v) -> log.info("get index key: {}, value= {}", k, v)); // 查看全部索引 IndicesResponse indicesResponse = esClient.cat().indices(); indicesResponse.valueBody().forEach(i -> log.info("get all index, health: {}, status: {}, uuid: {}", i.health(), i.status(), i.uuid())); }
删除索引
public static void delIndex(ElasticsearchClient esClient, String indexName) throws IOException { if (existsIndex(esClient, indexName)) { log.info("index: {} exists!", indexName); DeleteIndexResponse deleteIndexResponse = esClient.indices().delete(s -> s.index(indexName)); log.info("删除索引操作结果:{}", deleteIndexResponse.acknowledged()); } else { log.info("index: {} not found!", indexName); } }
更新Mappings
public static void updateMappings(ElasticsearchClient esClient, String indexName, Map documentMap) throws IOException { PutMappingRequest putMappingRequest = PutMappingRequest.of(m -> m.index(indexName).properties(documentMap)); PutMappingResponse putMappingResponse = esClient.indices().putMapping(putMappingRequest); boolean acknowledged = putMappingResponse.acknowledged(); log.info("update mappings ack: {}", acknowledged); }
使用更新Mappings方法
@Test public void updateMappingsTest() throws IOException { Map documentMap = new HashMap(); documentMap.put("name", Property.of(p -> p.text(TextProperty.of(t -> t.index(true))))); documentMap.put("location", Property.of(p -> p.geoPoint(GeoPointProperty.of(g -> g.ignoreZValue(true))))); // index 设置为 true,才可以使用 search range 功能 documentMap.put("age", Property.of(p -> p.integer(IntegerNumberProperty.of(i -> i.index(true))))); EsUtils.updateMappings(esClient, indexName, documentMap); }
文档操作实体类
@Data public class Product { private String id; private String name; private String location; private Integer age; private String polygon;}
新增
public static void addOneDocument(ElasticsearchClient esClient, String indexName, Product product) throws IOException { IndexResponse indexResponse = esClient.index(i -> i.index(indexName).id(product.getId()).document(product)); log.info("add one document result: {}", indexResponse.result().jsonValue()); }
批量新增
public static void batchAddDocument(ElasticsearchClient esClient, String indexName, List products) throws IOException { List bulkOperations = new ArrayList(); products.forEach(p -> bulkOperations.add(BulkOperation.of(b -> b.index(c -> c.id(p.getId()).document(p))))); BulkResponse bulkResponse = esClient.bulk(s -> s.index(indexName).operations(bulkOperations)); bulkResponse.items().forEach(b -> log.info("bulk response result = {}", b.result())); log.error("bulk response.error() = {}", bulkResponse.errors()); }
查询
public static void getDocument(ElasticsearchClient esClient, String indexName, String id) throws IOException { GetResponse getResponse = esClient.get(s -> s.index(indexName).id(id), Product.class); if (getResponse.found()) { Product source = getResponse.source(); log.info("get response: {}", source); } // 判断文档是否存在 BooleanResponse booleanResponse = esClient.exists(s -> s.index(indexName).id(id)); log.info("文档id:{},是否存在:{}", id, booleanResponse.value()); }
更新
public static void updateDocument(ElasticsearchClient esClient, String indexName, Product product) throws IOException { UpdateResponse updateResponse = esClient.update(s -> s.index(indexName).id(product.getId()).doc(product), Product.class); log.info("update doc result: {}", updateResponse.result()); }
删除
public static void deleteDocument(ElasticsearchClient esClient, String indexName, String id) { try { DeleteResponse deleteResponse = esClient.delete(s -> s.index(indexName).id(id)); log.info("del doc result: {}", deleteResponse.result()); } catch (IOException e) { log.error("del doc failed, error: ", e); } }
批量删除
public static void batchDeleteDocument(ElasticsearchClient esClient, String indexName, List ids) { List bulkOperations = new ArrayList(); ids.forEach(a -> bulkOperations.add(BulkOperation.of(b -> b.delete(c -> c.id(a))))); try { BulkResponse bulkResponse = esClient.bulk(a -> a.index(indexName).operations(bulkOperations)); bulkResponse.items().forEach(a -> log.info("batch del result: {}", a.result())); log.error("batch del bulk resp errors: {}", bulkResponse.errors()); } catch (IOException e) { log.error("batch del doc failed, error: ", e); } }
搜索单个搜索
public static void searchOne(ElasticsearchClient esClient, String indexName, String searchText) throws IOException { SearchResponse searchResponse = esClient.search(s -> s .index(indexName) // 搜索请求的查询部分(搜索请求也可以有其他组件,如聚合) .query(q -> q // 在众多可用的查询变体中选择一个。我们在这里选择匹配查询(全文搜索) .match(t -> t .field("name") .query(searchText))), Product.class); TotalHits total = searchResponse.hits().total(); boolean isExactResult = total != null && total.relation() == TotalHitsRelation.Eq; if (isExactResult) { log.info("search has: {} results", total.value()); } else { log.info("search more than : {} results", total.value()); } List<Hit> hits = searchResponse.hits().hits(); for (Hit hit : hits) { Product source = hit.source(); log.info("Found result: {}", source); } }
分页搜索
public static void searchPage(ElasticsearchClient esClient, String indexName, String searchText) throws IOException { Query query = RangeQuery.of(r -> r .field("age") .gte(JsonData.of(8))) ._toQuery(); SearchResponse searchResponse = esClient.search(s -> s .index(indexName) .query(q -> q .bool(b -> b.must(query))) // 分页查询,从第0页开始查询四个doc .from(0) .size(4) // 按id降序排列 .sort(f -> f .field(o -> o .field("age").order(SortOrder.Desc))), Product.class); List<Hit> hits = searchResponse.hits().hits(); for (Hit hit : hits) { Product product = hit.source(); log.info("search page result: {}", product); } }
参考
- ElasticSearch官方文档
- Elasticsearch Java API Client
- 俊逸的博客(ElasticSearchx系列)
- Elasticsearch Java API 客户端(中文文档)