13

Elasticsearch(三) API调用

 2 years ago
source link: https://zhouj000.github.io/2021/12/30/es3/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Elasticsearch(一) 入门
Elasticsearch(二) 搜索
Elasticsearch(三) API调用

Java Api

  • Java REST Client
    • Java Low Level REST Client
      • 低级别的REST客户端,通过http与集群交互,用户需自己编组请求JSON串,及解析响应JSON串
      • 兼容所有ES版本
    • Java High Level REST Client
      • 高级别的REST客户端,基于低级别的REST客户端,增加了编组请求JSON串、解析响应JSON串等相关api
      • 使用的版本需要保持和ES服务端的版本一致,否则会有版本问题
  • Java Client

Low Level REST Client

引入maven

<dependency>
	<groupId>org.elasticsearch.client</groupId>
	<artifactId>elasticsearch-rest-client</artifactId>
	<version>7.15.2</version>
</dependency>

简单的demo:

RestClient restClient = RestClient.builder(
                new HttpHost("localhost", 9200, "http"),
                new HttpHost("localhost", 9201, "http")).build();

Request request = new Request("GET", "/");
// request.setEntity(new NStringEntity("{\"json\":\"text\"}", ContentType.APPLICATION_JSON));
// request.setJsonEntity("{\"json\":\"text\"}");
// request.addParameter("key", "value");
// 同步
Response response = restClient.performRequest(request);
String responseBody = EntityUtils.toString(response.getEntity());
System.out.println(responseBody);

restClient.close();
Cancellable cancellable = restClient.performRequestAsync(request,
    new ResponseListener() {
        @Override
        public void onSuccess(Response response) {
            // doSomething
        }

        @Override
        public void onFailure(Exception exception) {
            // doSomething
        }
});
// 取消,客户端终止http请求
// cancellable.cancel();

可以在创建RestClient时进行一些配置:

RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
// 全局的header,比如Authorization等
Header[] defaultHeaders = new Header[]{new BasicHeader("header", "value")};
builder.setDefaultHeaders(defaultHeaders);
// 添加节点失败通知Listener
builder.setFailureListener(new RestClient.FailureListener() {
    @Override
    public void onFailure(Node node) {
        // doSomething
    }
});
// 设置超时
builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
	@Override
	public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
		return requestConfigBuilder.setConnectTimeout(5000)
								   .setSocketTimeout(60000);
	}
});
// 验证
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("user", "user-password"));
// 使者线程id
builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
	@Override
	public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
		// httpClientBuilder.disableAuthCaching();
		return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
								.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());
	}
});

RestClient restClient = builder.build();

可以再request上进行配置:

RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
// Header
builder.addHeader("Authorization", TOKEN); 
// response consumer
builder.setHttpAsyncResponseConsumerFactory(new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024));
RequestOptions COMMON_OPTIONS = builder.build();

request.setOptions(COMMON_OPTIONS);

异步并发:

final CountDownLatch latch = new CountDownLatch(documents.length);
for (int i = 0; i < documents.length; i++) {
    Request request = new Request("PUT", "/posts/doc/" + i);
    request.setEntity(documents[i]);
    restClient.performRequestAsync(request,
            new ResponseListener() {
                @Override
                public void onSuccess(Response response) {
					// doSomething
                    latch.countDown();
                }

                @Override
                public void onFailure(Exception exception) {
                    // doSomething
                    latch.countDown();
                }
            }
    );
}
latch.await();

使用TLS:

// 第一种
Path trustStorePath = Paths.get("/path/to/truststore.p12");
KeyStore truststore = KeyStore.getInstance("pkcs12");
try (InputStream is = Files.newInputStream(trustStorePath)) {
	truststore.load(is, keyStorePass.toCharArray());
}
// 第二种
Path caCertificatePath = Paths.get("/path/to/ca.crt");
CertificateFactory factory = CertificateFactory.getInstance("X.509");
Certificate trustedCa;
try (InputStream is = Files.newInputStream(caCertificatePath)) {
	trustedCa = factory.generateCertificate(is);
}
KeyStore trustStore = KeyStore.getInstance("pkcs12");
trustStore.load(null, null);
trustStore.setCertificateEntry("ca", trustedCa);


SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(trustStore, null);
final SSLContext sslContext = sslBuilder.build();
RestClient.builder(new HttpHost("localhost", 9200, "https"))
		.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
			@Override
			public HttpAsyncClientBuilder customizeHttpClient(
					HttpAsyncClientBuilder httpClientBuilder) {
				return httpClientBuilder.setSSLContext(sslContext);
			}
		});

嗅探器(Sniffer): 允许从运行的elasticsearch集群中自动发现节点,并将其设置到现有的RestClient实例中

High Level REST Client

版本向后兼容

同样引入maven

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.15.2</version>
</dependency>

与spring-boot的2.1.8.RELEASE版本会有报错,NoClassDefFoundError,需要版本号对应,这里新建了个非spring boot项目测试接口

连接与关闭:

RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(
                        new HttpHost("localhost", 9200, "http"),
                        new HttpHost("localhost", 9201, "http")));

client.close();

测试的几种方法:

AnalyzeRequest request = AnalyzeRequest.withGlobalAnalyzer("english", "Some text to analyze", "Some more text to analyze");

// 自定义分析器
Map<String, Object> stopFilter = new HashMap<>();
stopFilter.put("type", "stop");
stopFilter.put("stopwords", new String[]{"to"});
AnalyzeRequest request2 = AnalyzeRequest.buildCustomAnalyzer("standard")
		.addCharFilter("html_strip")
		.addTokenFilter("lowercase")
		.addTokenFilter(stopFilter)
		.build("Some text to analyze");

AnalyzeRequest request3 = AnalyzeRequest.buildCustomNormalizer()
		.addTokenFilter("lowercase")
		.build("<b>BaR</b>");

AnalyzeRequest request4 = AnalyzeRequest.withIndexAnalyzer(
		"my_index",
		"my_analyzer",
		"some text to analyze"
);
AnalyzeRequest request5 = AnalyzeRequest.withNormalizer(
		"my_index",
		"my_normalizer",
		"some text to analyze"
);

AnalyzeRequest request6 = AnalyzeRequest.withField("my_index", "my_field", "some text to analyze");

request.explain(true);
// request.attributes("keyword", "type");

AnalyzeResponse response = client.indices().analyze(request3, RequestOptions.DEFAULT);
if (response.detail() != null) {
	for (AnalyzeResponse.AnalyzeToken token : response.detail().analyzer().getTokens() ) {
		System.out.println(token.getTerm());
	}
}
if (response.getTokens() != null) {
	response.getTokens().forEach(token -> System.out.println(token.getTerm()));
}

创建索引的几种方法:

CreateIndexRequest request = new CreateIndexRequest("twitter");
// 设置settings
request.settings(Settings.builder()
		.put("index.number_of_shards", 3)
		.put("index.number_of_replicas", 2)
);

// 设置mapping方法1
request.mapping("{\n" +
				"  \"properties\": {\n" +
				"    \"message\": {\n" +
				"      \"type\": \"text\"\n" +
				"    }\n" +
				"  }\n" +
				"}",
XContentType.JSON);

// 设置mapping方法2
Map<String, Object> message = new HashMap<>();
message.put("type", "text");
Map<String, Object> properties = new HashMap<>();
properties.put("message", message);
Map<String, Object> mapping = new HashMap<>();
mapping.put("properties", properties);
request.mapping(mapping);

// 设置mapping方法3
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
	builder.startObject("properties");
	{
		builder.startObject("message");
		{
			builder.field("type", "text");
		}
		builder.endObject();
	}
	builder.endObject();
}
builder.endObject();
request.mapping(builder);

// 设置别名
request.alias(new Alias("twitter_alias"));


// 直接json设置
request.source("{\n" +
		"    \"settings\" : {\n" +
		"        \"number_of_shards\" : 1,\n" +
		"        \"number_of_replicas\" : 0\n" +
		"    },\n" +
		"    \"mappings\" : {\n" +
		"        \"properties\" : {\n" +
		"            \"message\" : { \"type\" : \"text\" }\n" +
		"        }\n" +
		"    },\n" +
		"    \"aliases\" : {\n" +
		"        \"twitter_alias\" : {}\n" +
		"    }\n" +
		"}", XContentType.JSON);
		
// 超时等设置
request.setTimeout(TimeValue.timeValueMinutes(2)); 		

// 同步
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
System.out.println(createIndexResponse.index() + ":" + createIndexResponse.isAcknowledged() + "," + createIndexResponse.isShardsAcknowledged());
try {
	DeleteIndexRequest request = new DeleteIndexRequest("twitter"); 
	AcknowledgedResponse deleteIndexResponse = client.indices().delete(request, RequestOptions.DEFAULT);
	System.out.println(deleteIndexResponse.isAcknowledged()));
} catch (ElasticsearchException exception) {
    if (exception.status() == RestStatus.NOT_FOUND) {
        System.out.println("NOT_FOUND");
    }
}
PutMappingRequest request = new PutMappingRequest("twitter");
request.source(
    "{\n" +
    "  \"properties\": {\n" +
    "    \"message\": {\n" +
    "      \"type\": \"text\"\n" +
    "    }\n" +
    "  }\n" +
    "}", 
    XContentType.JSON);
AcknowledgedResponse putMappingResponse = client.indices().putMapping(request, RequestOptions.DEFAULT);

Index APIs

创建索引文档

几种创建方法:

IndexRequest request = new IndexRequest("test");
request.id("1");
String jsonString = "{" +
		"\"user\":\"kimchy\"," +
		"\"postDate\":\"2021-12-01\"," +
		"\"message\":\"trying out Elasticsearch\"" +
		"}";
request.source(jsonString, XContentType.JSON);

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
IndexRequest indexRequest = new IndexRequest("test").id("1").source(jsonMap);

IndexRequest indexRequest2 = new IndexRequest("test")
		.id("1")
		.source("user", "kimchy",
				"postDate", new Date(),
				"message", "trying out Elasticsearch");

				
// 超时时间
request.timeout(TimeValue.timeValueSeconds(1)); 
request.timeout("1s"); 
// 刷新规则
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
request.setRefreshPolicy("wait_for"); 
// 版本,指定create操作类型时不能精确指定
request.version(1);
request.versionType(VersionType.EXTERNAL);
// 操作类型,指定后id重复将发生冲突
request.opType(DocWriteRequest.OpType.CREATE); 
request.opType("create");


// 同步
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);				
// 异步
ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
	@Override
	public void onResponse(IndexResponse indexResponse) {
		// doSomething
	}
	@Override
	public void onFailure(Exception e) {
		// doSomething
	}
};
client.indexAsync(request, RequestOptions.DEFAULT, listener);

打印返回结果:

System.out.println("index: " + indexResponse.getIndex() + ", id: " + indexResponse.getId());
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
	System.out.println("created");
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
	System.out.println("updated");
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
	System.out.println("less");
}
if (shardInfo.getFailed() > 0) {
	for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
		System.out.println("fail: " + failure.reason());
	}
}
第一次执行打印:
index: test, id: 1
created
success

第二次执行打印:
index: test, id: 1
updated
success

模拟版本冲突

IndexRequest request = new IndexRequest("test").id("1").source("field", "value")
                                    .setIfSeqNo(10L).setIfPrimaryTerm(20);
try {
	IndexResponse response = client.index(request, RequestOptions.DEFAULT);
	System.out.println(response.getResult());
} catch(ElasticsearchException e) {
	if (e.status() == RestStatus.CONFLICT) {
		System.out.println("conflict");
	}
} catch (Exception e) {
	System.out.println(e);
}

如果在opType为create时同索引下已有此id也会发生冲突

IndexRequest request = new IndexRequest("test").id("1").source("field", "value")
							.opType(DocWriteRequest.OpType.CREATE);
GetRequest getRequest = new GetRequest("test", "1");
// 不获取source
// getRequest.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);
// 取哪些
String[] includes = new String[]{"message", "*Date"};
String[] excludes = Strings.EMPTY_ARRAY;
// String[] includes = Strings.EMPTY_ARRAY;
// String[] excludes = new String[]{"message"};
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
getRequest.fetchSourceContext(fetchSourceContext);
// 其他配置
// getRequest.routing("routing");
// getRequest.storedFields("message");

// 同步
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
System.out.println(getResponse.getIndex() + "/" + getResponse.getId() + ": " + getResponse.getFields());
if (getResponse.isExists()) {
	getResponse.getSource().forEach((k, v) -> System.out.println(k + ": " + v));
	String sourceAsString = getResponse.getSourceAsString();
	Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
}

// 异步
ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
	@Override
	public void onResponse(GetResponse getResponse) {
		System.out.println(getResponse.isExists());
	}

	@Override
	public void onFailure(Exception e) {
		e.printStackTrace();
	}
};
client.getAsync(getRequest, RequestOptions.DEFAULT, listener);

如果获取的索引不存在,会抛出ElasticsearchException

catch (ElasticsearchException e) {
	if (e.status() == RestStatus.NOT_FOUND) {
		System.out.println("404");
	}
}

当要获取指定version的文档,且已存在文档的version不一致时也会抛出异常

try {
    GetRequest request = new GetRequest("posts", "1").version(2);
    GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);
} catch (ElasticsearchException exception) {
    if (exception.status() == RestStatus.CONFLICT) {
        System.out.println("409");
    }
}

只获取_source字段时可以使用GetSourceRequest,使用和GetRequest差不多

GetSourceRequest getSourceRequest = new GetSourceRequest("test","1");
GetSourceResponse response = client.getSource(getSourceRequest, RequestOptions.DEFAULT);
response.getSource().forEach((k, v) -> System.out.println(k + ": " + v));

EXISTS

如果只要判断是否存在

GetRequest getRequest = new GetRequest("test", "1");
getRequest.fetchSourceContext(new FetchSourceContext(false));
getRequest.storedFields("_none_");

// 同步
boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
// 异步
// client.existsAsync(getRequest, RequestOptions.DEFAULT, listener);
System.out.println("exists: " + exists);
DeleteRequest request = new DeleteRequest("test", "1"); // .setIfSeqNo(100).setIfPrimaryTerm(2)
request.timeout("2m");

// 同步
DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
	System.out.println("404");
	return;
}
System.out.println(deleteResponse.getIndex() + "/" + deleteResponse.getId() + ": " + deleteResponse.getVersion());
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
	System.out.println("less");
}
if (shardInfo.getFailed() > 0) {
	for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
		System.out.println(failure.reason());
	}
}

// 异步
ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {
	@Override
	public void onResponse(DeleteResponse deleteResponse) {
		System.out.println(deleteResponse.getShardInfo().getSuccessful());
	}

	@Override
	public void onFailure(Exception e) {
		System.out.println("onFailure");
	}
};
client.deleteAsync(request, RequestOptions.DEFAULT, listener);

与获取一样,如果不存在的文档会返回DocWriteResponse.Result.NOT_FOUND的状态,如果version冲突返回RestStatus.CONFLICT

部分修改的几种方法

// String jsonString = "{" +
//  	"\"updated\":\"2021-12-01\"," +
//  	"\"reason\":\"daily update\"" +
//  "}";
// UpdateRequest request = new UpdateRequest("test", "1").doc(jsonString, XContentType.JSON);

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("total", 10);
jsonMap.put("message", "new update message");
UpdateRequest request = new UpdateRequest("test", "1").doc(jsonMap);

// XContentBuilder builder = XContentFactory.jsonBuilder();
// builder.startObject();
// {
//     builder.timeField("updated", new Date());
//     builder.field("reason", "daily update");
// }
// builder.endObject();
// UpdateRequest request = new UpdateRequest("test", "1").doc(builder);

// UpdateRequest request = new UpdateRequest("test", "1")
//                          .doc("updated", new Date(), "reason", "daily update");

// Upsert
// String jsonString = "{\"created\":\"2017-01-01\"}";
// request.upsert(jsonString, XContentType.JSON);
request.fetchSource(true);
request.docAsUpsert(true);
// request.timeout("1s");
// request.retryOnConflict(3);
// String[] includes = new String[]{"updated", "r*"};
// String[] excludes = Strings.EMPTY_ARRAY;
// request.fetchSource(new FetchSourceContext(true, includes, excludes));
// request.detectNoop(false);
// ..

执行更新,处理结果

UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
// 异步
// client.updateAsync(request, RequestOptions.DEFAULT, listener);

System.out.println(updateResponse.getIndex() + "/" + updateResponse.getId() + ": " + updateResponse.getVersion());
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
	System.out.println("CREATED");
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
	System.out.println("UPDATED");	// UPDATED
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
	System.out.println("DELETED");
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
	System.out.println("NOOP");
}
GetResult result = updateResponse.getGetResult();
if (result.isExists()) {
	String sourceAsString = result.sourceAsString();
	Map<String, Object> sourceAsMap = result.sourceAsMap();
	sourceAsMap.forEach((k, v) -> System.out.println(k + ": " + v));
} else {
	System.out.println("not exists");
}
ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
	System.out.println("less");
}
if (shardInfo.getFailed() > 0) {
	for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
		System.out.println("failure: " + failure.reason());
	}
}

使用script脚本

UpdateRequest request = new UpdateRequest("test", "1");

// 使用inline脚本
Map<String, Object> parameters = Collections.singletonMap("count", 4);
Script inline = new Script(ScriptType.INLINE, "painless", "ctx._source.total += params.count", parameters);
request.script(inline);

// 使用已存储的脚本
// Script stored = new Script( ScriptType.STORED, null, "script_id", parameters);
// request.script(stored);

request.scriptedUpsert(true);

UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
System.out.println(updateResponse.getResult());		// UPDATED

同样,如果不存在的文档会返回DocWriteResponse.Result.NOT_FOUND的状态,如果version冲突返回RestStatus.CONFLICT

term查询

Term Vectors词条向量,是关于词的一些统计信息

TermVectorsRequest request = new TermVectorsRequest("test", "1");
request.setFields("user");

XContentBuilder docBuilder = XContentFactory.jsonBuilder();
docBuilder.startObject().field("message", "trying out Elasticsearch").endObject();
TermVectorsRequest request2 = new TermVectorsRequest("test", docBuilder);

// 同步
TermVectorsResponse response = client.termvectors(request, RequestOptions.DEFAULT);
TermVectorsResponse response2 = client.termvectors(request2, RequestOptions.DEFAULT);

批量文档操作

Bulk操作

BulkRequest request = new BulkRequest();
request.add(new IndexRequest("book").id("1").source(XContentType.JSON,"field", "foo"));
request.add(new IndexRequest("book").id("2").source(XContentType.JSON,"field", "bar"));
request.add(new IndexRequest("book").id("3").source(XContentType.JSON,"field", "baz"));

BulkRequest request2 = new BulkRequest();
request2.add(new DeleteRequest("book", "3"));
request2.add(new UpdateRequest("book", "2").doc(XContentType.JSON,"other", "test"));
request2.add(new IndexRequest("book").id("4").source(XContentType.JSON,"field", "baz"));

// 同步
BulkResponse bulkResponse = client.bulk(request2, RequestOptions.DEFAULT);
for (BulkItemResponse bulkItemResponse : bulkResponse) {
	if (bulkItemResponse.isFailed()) {
		BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
	}
	DocWriteResponse itemResponse = bulkItemResponse.getResponse();
	switch (bulkItemResponse.getOpType()) {
		case INDEX:
		case CREATE:
			IndexResponse indexResponse = (IndexResponse) itemResponse;
			break;
		case UPDATE:
			UpdateResponse updateResponse = (UpdateResponse) itemResponse;
			break;
		case DELETE:
			DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
	}
}
···
同样可以对request进行配置,可以使用异步调用

使用BulkProcessor可以进行简单的调用
```java
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
	@Override
	public void beforeBulk(long executionId, BulkRequest request) {
		System.out.println("before");
	}
	@Override
	public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
		System.out.println("after: " + response.hasFailures());
	}
	@Override
	public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
		System.out.println("failure");
	}
};

BulkProcessor.Builder builder = BulkProcessor.builder(
		(request, bulkListener) ->
				client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
		listener, "bulk-processor-name");
builder.setBulkActions(500);
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));
builder.setConcurrentRequests(0);
builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
BulkProcessor bulkProcessor = builder.build();

IndexRequest one = new IndexRequest("book").id("4")
		.source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("book").id("5")
		.source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("book").id("6")
		.source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch");
bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);

boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
System.out.println(terminated);
bulkProcessor.close();
MultiGetRequest request = new MultiGetRequest();
request.add(new MultiGetRequest.Item("book", "1").fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE));
request.add(new MultiGetRequest.Item("book", "2"));
String[] includes = new String[] {"foo", "*r"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.add(new MultiGetRequest.Item("book", "3").fetchSourceContext(fetchSourceContext));
// 默认查询数据,返回的属性字段都在_source中,需要在创建索引时设置字段的store属性为true,那么查询会再fields里显示
request.add(new MultiGetRequest.Item("book", "4").storedFields("title"));

// 同步
MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
for (MultiGetItemResponse item : response.getResponses()) {
	System.out.println(JSON.toJSONString(item));
}

按查询批量操作

按照查询批量更新:

UpdateByQueryRequest request = new UpdateByQueryRequest("test", "test2..");
request.setQuery(new TermQueryBuilder("user", "kimchy"));
request.setConflicts("proceed");
request.setMaxDocs(10);
// request.setBatchSize(100);
request.setScript(new Script(ScriptType.INLINE, "painless",
				"if (ctx._source.user == 'kimchy') {ctx._source.likes++;}",
				Collections.emptyMap()));
// request.setSlices(2);
// request.setScroll(TimeValue.timeValueMinutes(10));
request.setRouting("=cat");
request.setTimeout(TimeValue.timeValueMinutes(2));
request.setRefresh(true);

// 同步
BulkByScrollResponse bulkResponse = client.updateByQuery(request, RequestOptions.DEFAULT);

查询批量删除类似

DeleteByQueryRequest request = new DeleteByQueryRequest("source1", "source2"); 
request.setQuery(new TermQueryBuilder("user", "kimchy"));
// ...
BulkByScrollResponse bulkResponse = client.deleteByQuery(request, RequestOptions.DEFAULT);

批量term查询

// 第一种
MultiTermVectorsRequest request = new MultiTermVectorsRequest();
TermVectorsRequest tvrequest1 = new TermVectorsRequest("test", "1");
tvrequest1.setFields("user");
request.add(tvrequest1);

XContentBuilder docBuilder = XContentFactory.jsonBuilder();
docBuilder.startObject().field("user", "??").endObject();
TermVectorsRequest tvrequest2 = new TermVectorsRequest("test", docBuilder);
request.add(tvrequest2);

// 第二种
TermVectorsRequest tvrequestTemplate = new TermVectorsRequest("test", "fake_id");
tvrequestTemplate.setFields("user");
String[] ids = {"1", "2"};
MultiTermVectorsRequest request2 = new MultiTermVectorsRequest(ids, tvrequestTemplate);

// 同步
MultiTermVectorsResponse response = client.mtermvectors(request2, RequestOptions.DEFAULT);
List<TermVectorsResponse> tvresponseList = response.getTermVectorsResponses();
if (tvresponseList != null) {
	for (TermVectorsResponse tvresponse : tvresponseList) {
		System.out.println(tvresponse);
	}
}
// SearchRequest searchRequest = new SearchRequest();
// searchRequest.indices("test");
SearchRequest searchRequest = new SearchRequest("test");
// searchRequest.routing("routing");

SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// sourceBuilder .query(QueryBuilders.matchAllQuery());
sourceBuilder.query(QueryBuilders.termQuery("user", "kimchy2"));
sourceBuilder.from(0);
sourceBuilder.size(5);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
sourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC));
// sourceBuilder.sort(new FieldSortBuilder("id").order(SortOrder.ASC));
// sourceBuilder.fetchSource(false);
// String[] includeFields = new String[] {"title", "innerObject.*"};
// String[] excludeFields = new String[] {"user"};
// sourceBuilder.fetchSource(includeFields, excludeFields);
searchRequest.source(sourceBuilder);

// 同步
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
System.out.println(searchResponse.status().getStatus());
SearchHits hits = searchResponse.getHits();
System.out.println(hits.getTotalHits().value);
for (SearchHit hit : hits.getHits()) {
	System.out.println("============ " + hit.getIndex() + ": " + hit.getId());
	hit.getSourceAsMap().forEach((k, v) -> System.out.println(k + ": " + v));
	System.out.println(hit.getHighlightFields());
}
// Aggregations aggregations = searchResponse.getAggregations();
// Suggest suggest = searchResponse.getSuggest();
// Map<String, ProfileShardResult> profilingResults = searchResponse.getProfileResults();

其他构建方式,和高亮、聚合、查询建议

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("user", "kimchy");
matchQueryBuilder.fuzziness(Fuzziness.AUTO);
matchQueryBuilder.prefixLength(3);
matchQueryBuilder.maxExpansions(10);

QueryBuilder matchQueryBuilder2 = QueryBuilders.matchQuery("user", "kimchy")
		.fuzziness(Fuzziness.AUTO)
		.prefixLength(3)
		.maxExpansions(10);
searchSourceBuilder.query(matchQueryBuilder);
// searchSourceBuilder.profile(true);

// 高亮
HighlightBuilder highlightBuilder = new HighlightBuilder();
HighlightBuilder.Field highlightTitle = new HighlightBuilder.Field("title");
highlightTitle.highlighterType("unified");
highlightBuilder.field(highlightTitle);
HighlightBuilder.Field highlightUser = new HighlightBuilder.Field("user");
highlightBuilder.field(highlightUser);
searchSourceBuilder.highlighter(highlightBuilder);

// 聚合
TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_company").field("company.keyword");
aggregation.subAggregation(AggregationBuilders.avg("average_age").field("age"));
searchSourceBuilder.aggregation(aggregation);

// 查询建议
SuggestionBuilder termSuggestionBuilder = SuggestBuilders.termSuggestion("user").text("kmichy");
SuggestBuilder suggestBuilder = new SuggestBuilder();
suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder);
searchSourceBuilder.suggest(suggestBuilder);

滚动查询:

SearchRequest searchRequest = new SearchRequest("test");
// ...
searchRequest.scroll(TimeValue.timeValueMinutes(1L)); 

SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
String scrollId = searchResponse.getScrollId();
SearchHit[] searchHits = searchResponse.getHits().getHits();

// 如果searchHits不为空,使用scrollId滚动查询
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); 
scrollRequest.scroll(TimeValue.timeValueSeconds(30));
SearchResponse searchScrollResponse = client.scroll(scrollRequest, RequestOptions.DEFAULT);
scrollId = searchScrollResponse.getScrollId();  
ClearScrollRequest request = new ClearScrollRequest(); 
request.addScrollId(scrollId);

使用模板查询

SearchTemplateRequest request = new SearchTemplateRequest();
request.setRequest(new SearchRequest("test"));

// request.setScriptType(ScriptType.STORED);
// request.setScript("message_search");
request.setScriptType(ScriptType.INLINE);
request.setScript("{" +
		"  \"query\": { \"match\" : { \"\" : \"\" } }," +
		"  \"size\" : \"\"" +
		"}");
Map<String, Object> scriptParams = new HashMap<>();
scriptParams.put("field", "message");
scriptParams.put("value", "Elasticsearch");
scriptParams.put("size", 5);
request.setScriptParams(scriptParams);

request.setExplain(true);
request.setProfile(true);
SearchTemplateResponse response = client.searchTemplate(request, RequestOptions.DEFAULT);
SearchResponse searchResponse = response.getResponse();

如果只要查询数量,可以使用CountRequest

CountRequest countRequest = new CountRequest("test");
QueryBuilder queryBuilder = QueryBuilders.matchQuery("user", "kimchy2");
countRequest.query(queryBuilder);

CountResponse countResponse = client.count(countRequest, RequestOptions.DEFAULT);
System.out.println(countResponse.status());
System.out.println(countResponse.getCount());

Index APIs

Java Client

两种创建方法:

// 第一种
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200)).build();
RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);

// 第二种
RestClientBuilder httpClientBuilder = RestClient.builder(new HttpHost("localhost", 9200));
RestHighLevelClient hlrc = new RestHighLevelClient(httpClientBuilder);
RestClientTransport transport = new RestClientTransport(hlrc.getLowLevelClient(), new JacksonJsonpMapper());
ElasticsearchClient esClient = new ElasticsearchClient(transport);
CreateIndexResponse res = client.indices().create(c -> c.index("products"));

CreateIndexResponse createResponse = client.indices()
		.create(createIndexBuilder -> createIndexBuilder
				.index("myIndex")
				.aliases("abc", aliasBuilder -> aliasBuilder.isWriteIndex(true))
				// .mappings(JsonValue)
		);
SearchResponse<Object> search = client.search(s -> s
				.index("test")
				.query(q -> q
						.term(t -> t
								.field("user")
								.value("kimchy3")
						)),
		Object.class);

for (Hit<Object> hit: search.hits().hits()) {
	System.out.println(hit.source());
}


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK