Appearance
Hello ElasticSearch - 客户端
ElasticSearch 官方提供了各种不同语言的客户端,用于操作 ElasticSearch。这些客户端的本质就是组装 DSL 语句,通过 HTTP 请求发送给 ElasticSearch。官方文档地址:https://www.elastic.co/docs/reference/elasticsearch-clients。
1. Java API Client
Java API Client 是一个全新设计的库,随 ElasticSearch 8.0 发布,是 High Level Rest Client (HLRC) 的继任者,旨在提供更现代、轻量级且易于维护的客户端。它与服务端代码完全解耦,使用代码生成技术来保持与 REST API 的一致性。
可以与 HLRC 共存,共享同一个 Low Level REST Client,方便逐步迁移。
1.1. 初始化 ElasticsearchClient
1.1.1. 依赖管理
父 pom.xml 中的关键部分配置如下:
XML
<packaging>pom</packaging>
<properties>
<spring-boot.version>2.7.6</spring-boot.version>
<elasticsearch.version>8.18.3</elasticsearch.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
</dependencies>
</dependencyManagement>在 spring-boot-dependencies-2.7.6 中,默认的 ElasticSearch 版本为 7.17.7,且未包含 elasticsearch-java 的依赖声明。因此,我们在父 pom.xml 中将 elasticsearch.version 设置为 8.18.3(确保与 ElasticSearch 服务端版本一致),并引入了 elasticsearch-java 的依赖声明。
需要注意,elasticsearch-java 依赖于 elasticsearch-rest-client,而此处定义的 elasticsearch.version 属性不会覆盖 spring-boot-dependencies-2.7.6 中的 elasticsearch.version 属性。因此,我们还需要显式声明 elasticsearch-rest-client 的版本号。
1.1.2. 引入依赖
XML
<!-- ElasticSearch API Client -->
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
</dependency>
<!-- Jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- Jakarta JSON API -->
<dependency>
<groupId>jakarta.json</groupId>
<artifactId>jakarta.json-api</artifactId>
<version>2.0.1</version>
</dependency>elasticsearch-java-8.18.3 依赖 jakarta.json-api-2.0.1。
1.1.3. 实例化 ElasticsearchClient
Java
@Configuration
public class ElasticSearchConfiguration {
@Value("${elasticsearch.url}")
private String url;
@Value("${elasticsearch.api-key}")
private String apiKey;
@Bean(destroyMethod = "close")
public ElasticsearchClient elasticsearchClient() {
RestClient restClient = RestClient
.builder(HttpHost.create(url))
.setDefaultHeaders(new Header[]{new BasicHeader("Authorization", "ApiKey " + apiKey)})
.build();
ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
return new ElasticsearchClient(transport);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
如果使用 ElasticSearch 的自签名证书:
Java
@Configuration
public class ElasticSearchConfiguration {
@Value("${elasticsearch.certificate}")
private String certificate;
@Value("${elasticsearch.url}")
private String url;
@Value("${elasticsearch.api-key}")
private String apiKey;
@Bean(destroyMethod = "close")
public ElasticsearchClient elasticsearchClient() throws NoSuchAlgorithmException, KeyStoreException, CertificateException, IOException,
KeyManagementException {
CertificateFactory cf = CertificateFactory.getInstance("X.509");
FileInputStream certInputStream = new FileInputStream(certificate);
Certificate caCert = cf.generateCertificate(certInputStream);
certInputStream.close();
KeyStore trustStore = KeyStore.getInstance("JKS");
trustStore.load(null, null);
trustStore.setCertificateEntry("ca", caCert);
SSLContext sslContext = SSLContextBuilder.create()
.loadTrustMaterial(trustStore, null)
.build();
RestClient restClient = RestClient
.builder(HttpHost.create(url))
.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
.setSSLContext(sslContext)
.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE))
.setDefaultHeaders(new Header[]{new BasicHeader("Authorization", "ApiKey " + apiKey)})
.build();
ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
return new ElasticsearchClient(transport);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
1.2. 索引库操作
1.2.1. 创建索引库
使用 ElasticsearchClient 重新实现上述创建索引库的示例:
Java
@Test
void createProductIndex() throws IOException {
CreateIndexRequest request = CreateIndexRequest.of(builder -> builder
.index("product")
.mappings(mappings -> mappings
.properties("title", prop -> prop.text(text -> text.analyzer("ik_smart")))
.properties("brand", prop -> prop.keyword(keyword -> keyword.index(false)))
.properties("sku", prop -> prop.object(obj -> obj
.properties("name", subProp -> subProp.keyword(keyword -> keyword))
.properties("value", subProp -> subProp.keyword(keyword -> keyword))))));
CreateIndexResponse response = esClient.indices().create(request);
if (response.acknowledged()) {
System.out.println("Index created");
} else {
System.out.println("Failed to create index 'product'");
}
}1.2.2. 删除索引库
Java
@Test
void deleteProduceIndex() throws IOException {
DeleteIndexRequest request = DeleteIndexRequest.of(builder -> builder.index("product"));
DeleteIndexResponse response = esClient.indices().delete(request);
if (response.acknowledged()) {
System.out.println("Index deleted");
} else {
System.out.println("Failed to delete index 'product'");
}
}1.2.3. 判断索引库是否存在
Java
@Test
void testExistsProductIndex() throws IOException {
ExistsRequest request = ExistsRequest.of(builder -> builder.index("product"));
BooleanResponse exists = esClient.indices().exists(request);
System.out.println(exists.value());
}1.3. 文档操作
1.3.1. 文档对象准备
Java
@Data
@Accessors(chain = true)
public class ProductDoc {
private String title;
private String brand;
private List<SKU> sku;
@Data
@Accessors(chain = true)
public static class SKU {
private String name;
private String value;
}
}1.3.2. 新增文档
Java
@Test
void addProductToIndex() throws IOException {
IndexRequest<Object> request = IndexRequest.of(builder -> builder
.index("product")
.id("1")
.document(new ProductDoc()
.setTitle("小米14智能手机 16GB+512GB 黑色")
.setBrand("Xiaomi")
.setSku(Collections.singletonList(new ProductDoc.SKU()
.setName("型号")
.setValue("Mi14-512GB-Black")))));
IndexResponse response = esClient.index(request);
System.out.println(response.result());
}1.3.3. 根据 Id 查询文档
Java
@Test
void getProductDocById() throws IOException {
GetRequest request = GetRequest.of(builder -> builder.index("product").id("1"));
GetResponse<ProductDoc> response = esClient.get(request, ProductDoc.class);
ProductDoc source = response.source();
System.out.println(source);
}1.3.4. 根据 Id 局部修改文档
Java
@Test
void updProductDocById() throws IOException {
UpdateRequest<ProductDoc, ProductDoc> request = UpdateRequest.of(builder -> builder
.index("product")
.id("1")
.doc(new ProductDoc()
.setTitle("iPhone 2000")
.setBrand("Apple")));
UpdateResponse<ProductDoc> response = esClient.update(request, ProductDoc.class);
System.out.println(response.result());
}1.3.5. 根据 Id 删除文档
Java
@Test
void delProductDocById() throws IOException {
DeleteRequest request = DeleteRequest.of(builder -> builder.index("product").id("1"));
DeleteResponse response = esClient.delete(request);
System.out.println(response.result());
}1.3.6. 批操作
使用 BulkRequest.Builder 创建批量请求。通过循环添加多个 IndexOperation,每个操作对应一个产品的索引请求。批量操作可以提高性能,但建议根据实际场景控制批量大小(通常 100 ~ 1000 条为宜)。
Java
@Test
void addProductsToIndexBulk() throws IOException {
BulkRequest.Builder bulkBuilder = new BulkRequest.Builder();
for (int i = 1; i <= 3; i++) {
String id = String.valueOf(i);
ProductDoc product = new ProductDoc()
.setTitle("Product Title " + id)
.setBrand("Brand " + id)
.setSku(Collections.singletonList(new ProductDoc.SKU()
.setName("SKU Name " + id)
.setValue("SKU Value " + id)));
bulkBuilder.operations(op -> op
.index(idx -> idx
.index("product")
.id(id)
.document(product)));
}
BulkResponse bulkResponse = esClient.bulk(bulkBuilder.build());
if (bulkResponse.errors()) {
bulkResponse.items().forEach(item -> {
if (item.error() != null) {
System.out.println("Failed to index document " + item.id() + ": " + item.error().reason());
} else {
System.out.println("Successfully indexed document " + item.id() + ": " + item.result());
}
});
} else {
System.out.println("All documents indexed successfully");
}
}1.4. 查询操作
1.4.1. match_all
Java
esClient.search(s -> s
.index("product")
.query(q -> q.matchAll(m -> m)),
ProductDoc.class);1.4.2. match
Java
esClient.search(s -> s
.index("product")
.query(q -> q.match(m -> m
.field("title")
.query("iphone"))),
ProductDoc.class);1.4.3. multi_match
Java
esClient.search(s -> s
.index("product")
.query(q -> q.multiMatch(m -> m
.query("iphone")
.fields("title", "brand"))),
ProductDoc.class);1.4.4. term
Java
esClient.search(s -> s
.index("product")
.query(q -> q.term(t -> t
.field("brand")
.value("Apple"))),
ProductDoc.class);1.4.5. range
Java
esClient.search(s -> s
.index("product")
.query(q -> q.range(r -> r.number(n -> n
.field("price")
.gte(5000.0)
.lte(8000.0)))),
ProductDoc.class);1.4.6. geo_distance
Java
esClient.search(s -> s
.index("product")
.query(q -> q.geoDistance(g -> g
.field("geo")
.distance("12km")
.location(l -> l.latlon(b -> b.lat(39.915).lon(116.414))))),
ProductDoc.class);1.4.7. geo_bounding_box
Java
esClient.search(s -> s
.index("product")
.query(q -> q.geoBoundingBox(g -> g
.field("geo")
.boundingBox(b -> b.tlbr(tlbr -> tlbr
.topLeft(tl -> tl.latlon(l -> l.lat(39.915).lon(116.414)))
.bottomRight(br -> br.latlon(l -> l.lat(39.879).lon(116.46584))))))),
ProductDoc.class);1.4.8. bool
Java
esClient.search(s -> s
.index("product")
.query(q -> q.bool(b -> b
.must(m -> m.match(mm -> mm.field("title").query("iphone")))
.filter(f -> f.term(t -> t.field("sku.name").value("SKU Name 1")))
.filter(f -> f.term(t -> t.field("sku.value").value("SKU Value 1"))))),
ProductDoc.class);1.4.9. function_score
Java
esClient.search(s -> s
.index("product")
.query(q -> q.functionScore(fs -> fs
.query(q2 -> q2.match(m -> m.field("title").query("iphone")))
.functions(
FunctionScore.of(f -> f
.filter(ff -> ff.term(t -> t.field("sku.name").value("SKU Name 1")))
.weight(10.0)),
FunctionScore.of(f -> f
.filter(ff -> ff.term(t -> t.field("sku.value").value("SKU Value 1")))
.weight(100.0)))
.scoreMode(FunctionScoreMode.Sum)
.boostMode(FunctionBoostMode.Sum)))
, ProductDoc.class);1.4.10. 普通字段排序
Java
esClient.search(s -> s
.index("product")
.query(q -> q.matchAll(m -> m))
.sort(ss -> ss.field(f -> f.field("price").order(SortOrder.Asc)))
, ProductDoc.class);1.4.11. 地理坐标距离排序
Java
esClient.search(s -> s
.index("product")
.query(q -> q.matchAll(m -> m))
.sort(ss -> ss.geoDistance(g -> g
.field("geo")
.location(l -> l.latlon(b -> b.lat(39.915).lon(116.414)))
.unit(DistanceUnit.Kilometers)
.order(SortOrder.Asc))),
ProductDoc.class);1.4.12. 分页
Java
esClient.search(s -> s
.index("product")
.query(q -> q.matchAll(m -> m))
.sort(ss -> ss.field(f -> f.field("price").order(SortOrder.Asc)))
.from(0)
.size(2),
ProductDoc.class);1.4.13. 高亮
Java
esClient.search(s -> s
.index("product")
.query(q -> q.match(m -> m.field("title").query("iphone")))
.highlight(h -> h.fields("title", f -> f
.preTags("<em>")
.postTags("</em>")
.requireFieldMatch(true))),
ProductDoc.class);1.5. 聚合
1.5.1. 桶聚合
Java
SearchResponse<ProductDoc> response = esClient.search(s -> s
.index("product")
.query(q -> q.matchAll(m -> m))
.size(0)
.aggregations("by_brand", a -> a
.terms(t -> t
.field("brand")
.order(Collections.singletonList(NamedValue.of("_count", SortOrder.Asc))))),
ProductDoc.class);
for (StringTermsBucket brandBucket : response.aggregations().get("by_brand").sterms().buckets().array()) {
String key = brandBucket.key().stringValue();
long count = brandBucket.docCount();
System.out.println("key: " + key + ", count: " + count);
}1.5.2. 指标聚合
Java
SearchResponse<ProductDoc> response = esClient.search(s -> s
.index("product")
.query(q -> q.matchAll(m -> m))
.size(0)
.aggregations("by_brand", a -> a
.terms(t -> t
.field("brand")
.order(Collections.singletonList(NamedValue.of("by_price.avg", SortOrder.Desc))))
.aggregations("by_price", subAgg -> subAgg
.stats(st -> st.field("price")))),
ProductDoc.class);
for (StringTermsBucket brandBucket : response.aggregations().get("by_brand").sterms().buckets().array()) {
String key = brandBucket.key().stringValue();
long count = brandBucket.docCount();
StatsAggregate byPriceStats = brandBucket.aggregations().get("by_price").stats();
long priceCount = byPriceStats.count();
double priceMin = byPriceStats.min();
double priceMax = byPriceStats.max();
double priceAvg = byPriceStats.avg();
double priceSum = byPriceStats.sum();
System.out.println("key: " + key + ", count: " + count + ", priceCount: " + priceCount + ", priceMin: " + priceMin + ", " +
"priceMax: " + priceMax + ", priceAvg: " + priceAvg + ", priceSum: " + priceSum);
}1.6. 自动补全
Java
SearchResponse<Void> response = esClient.search(s -> s
.index("product")
.suggest(ss -> ss.suggesters("product-suggest", suggs -> suggs
.prefix("hw")
.completion(c -> c
.field("suggest")
.skipDuplicates(true)
.size(10)))));
for (Suggestion<Void> suggestion : response.suggest().get("product-suggest")) {
for (CompletionSuggestOption<Void> option : suggestion.completion().options()) {
System.out.println(option.text());
}
}