一、java操作ES有两种客户端:
1、TransportClient:轻量级的Client,使用Netty线程池,Socket连接到ES集群。本身不加入到集群,只作为请求的处理。
2、Node Client:客户端节点本身也是ES节点,加入到集群,和其他ElasticSearch节点一样。频繁的开启和关闭这类Node Clients会在集群中产生“噪音”。
二、TransportClient的基本使用
1、创建Client
public ElasticSearchService(String ipAddress, int port) {
client = new TransportClient()
.addTransportAddress(new InetSocketTransportAddress(ipAddress,
port));
}
2、创建/删除Index和Type信息
// 创建索引
public void createIndex() {
client.admin().indices().create(new CreateIndexRequest(IndexName))
.actionGet();
}
// 清除所有索引
public void deleteIndex() {
IndicesExistsResponse indicesExistsResponse = client.admin().indices()
.exists(new IndicesExistsRequest(new String[] { IndexName }))
.actionGet();
if (indicesExistsResponse.isExists()) {
client.admin().indices().delete(new DeleteIndexRequest(IndexName))
.actionGet();
}
}
// 删除Index下的某个Type
public void deleteType(){
client.prepareDelete().setIndex(IndexName).setType(TypeName).execute().actionGet();
}
// 定义索引的映射类型
public void defineIndexTypeMapping() {
try {
XContentBuilder mapBuilder = XContentFactory.jsonBuilder();
mapBuilder.startObject()
.startObject(TypeName)
.startObject("properties")
.startObject(IDFieldName).field("type", "long").field("store", "yes").endObject()
.startObject(SeqNumFieldName).field("type", "long").field("store", "yes").endObject()
.startObject(IMSIFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()
.startObject(IMEIFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()
.startObject(DeviceIDFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()
.startObject(OwnAreaFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()
.startObject(TeleOperFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()
.startObject(TimeFieldName).field("type", "date").field("store", "yes").endObject()
.endObject()
.endObject()
.endObject();
PutMappingRequest putMappingRequest = Requests
.putMappingRequest(IndexName).type(TypeName)
.source(mapBuilder);
client.admin().indices().putMapping(putMappingRequest).actionGet();
} catch (IOException e) {
log.error(e.toString());
}
}
这里自定义了某个Type的索引映射(Mapping),默认ES会自动处理数据类型的映射:针对整型映射为long,浮点数为double,字符串映射为string,时间为date,true或false为boolean。
注意:针对字符串,ES默认会做“analyzed”处理,即先做分词、去掉stop words等处理再index。如果你需要把一个字符串做为整体被索引到,需要把这个字段这样设置:field("index", "not_analyzed")。
详情参考:https://www.elastic.co/guide/en/elasticsearch/guide/current/mapping-intro.html
3、索引数据
// 批量索引数据
public void indexHotSpotDataList(List<Hotspotdata> dataList) {
if (dataList != null) {
int size = dataList.size();
if (size > 0) {
BulkRequestBuilder bulkRequest = client.prepareBulk();
for (int i = 0; i < size; ++i) {
Hotspotdata data = dataList.get(i);
String jsonSource = getIndexDataFromHotspotData(data);
if (jsonSource != null) {
bulkRequest.add(client
.prepareIndex(IndexName, TypeName,
data.getId().toString())
.setRefresh(true).setSource(jsonSource));
}
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
Iterator<BulkItemResponse> iter = bulkResponse.iterator();
while (iter.hasNext()) {
BulkItemResponse itemResponse = iter.next();
if (itemResponse.isFailed()) {
log.error(itemResponse.getFailureMessage());
}
}
}
}
}
}
// 索引数据
public boolean indexHotspotData(Hotspotdata data) {
String jsonSource = getIndexDataFromHotspotData(data);
if (jsonSource != null) {
IndexRequestBuilder requestBuilder = client.prepareIndex(IndexName,
TypeName).setRefresh(true);
requestBuilder.setSource(jsonSource)
.execute().actionGet();
return true;
}
return false;
}
// 得到索引字符串
public String getIndexDataFromHotspotData(Hotspotdata data) {
String jsonString = null;
if (data != null) {
try {
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
jsonBuilder.startObject().field(IDFieldName, data.getId())
.field(SeqNumFieldName, data.getSeqNum())
.field(IMSIFieldName, data.getImsi())
.field(IMEIFieldName, data.getImei())
.field(DeviceIDFieldName, data.getDeviceID())
.field(OwnAreaFieldName, data.getOwnArea())
.field(TeleOperFieldName, data.getTeleOper())
.field(TimeFieldName, data.getCollectTime())
.endObject();
jsonString = jsonBuilder.string();
} catch (IOException e) {
log.equals(e);
}
}
return jsonString;
}
ES支持批量和单个数据索引。
4、查询获取数据
// 获取少量数据100个
private List<Integer> getSearchData(QueryBuilder queryBuilder) {
List<Integer> ids = new ArrayList<>();
SearchResponse searchResponse = client.prepareSearch(IndexName)
.setTypes(TypeName).setQuery(queryBuilder).setSize(100)
.execute().actionGet();
SearchHits searchHits = searchResponse.getHits();
for (SearchHit searchHit : searchHits) {
Integer id = (Integer) searchHit.getSource().get("id");
ids.add(id);
}
return ids;
}
// 获取大量数据
private List<Integer> getSearchDataByScrolls(QueryBuilder queryBuilder) {
List<Integer> ids = new ArrayList<>();
// 一次获取100000数据
SearchResponse scrollResp = client.prepareSearch(IndexName)
.setSearchType(SearchType.SCAN).setScroll(new TimeValue(60000))
.setQuery(queryBuilder).setSize(100000).execute().actionGet();
while (true) {
for (SearchHit searchHit : scrollResp.getHits().getHits()) {
Integer id = (Integer) searchHit.getSource().get(IDFieldName);
ids.add(id);
}
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId())
.setScroll(new TimeValue(600000)).execute().actionGet();
if (scrollResp.getHits().getHits().length == 0) {
break;
}
}
return ids;
}
这里的QueryBuilder是一个查询条件,ES支持分页查询获取数据,也可以一次性获取大量数据,需要使用Scroll Search。
5、聚合(Aggregation Facet)查询
// 得到某段时间内设备列表上每个设备的数据分布情况<设备ID,数量>
public Map<String, String> getDeviceDistributedInfo(String startTime,
String endTime, List<String> deviceList) {
Map<String, String> resultsMap = new HashMap<>();
QueryBuilder deviceQueryBuilder = getDeviceQueryBuilder(deviceList);
QueryBuilder rangeBuilder = getDateRangeQueryBuilder(startTime, endTime);
QueryBuilder queryBuilder = QueryBuilders.boolQuery()
.must(deviceQueryBuilder).must(rangeBuilder);
TermsBuilder termsBuilder = AggregationBuilders.terms("DeviceIDAgg").size(Integer.MAX_VALUE)
.field(DeviceIDFieldName);
SearchResponse searchResponse = client.prepareSearch(IndexName)
.setQuery(queryBuilder).addAggregation(termsBuilder)
.execute().actionGet();
Terms terms = searchResponse.getAggregations().get("DeviceIDAgg");
if (terms != null) {
for (Terms.Bucket entry : terms.getBuckets()) {
resultsMap.put(entry.getKey(),
String.valueOf(entry.getDocCount()));
}
}
return resultsMap;
}
Aggregation查询可以查询类似统计分析这样的功能:如某个月的数据分布情况,某类数据的最大、最小、总和、平均值等。
详情参考:https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-aggs.html
本文参考:http://www.cnblogs.com/luxiaoxun/p/4869509.html
相关推荐
ElasticSearch Java API 中文文档 ElasticSearch Java API 中文文档
第一步:启动一个elasticsearch服务,bin目录下的elasticsearch.bat 第二步:建索引,执行MusicReviewIndex中的单元测试方法index() 第三步:检索测试,执行MusicReviewSearch中的单元测试方法search() 本实例基于: ...
Java做客户端对Elasticsearch服务的增删改查及批量修改操作,代码简洁易懂,思路清晰有注释.详情参考https://blog.csdn.net/linhaiyun_ytdx/article/category/7042758
Elasticsearch6.1.2 Java客户端,使用meven仓库创建,配置后即可使用。
Elasticsearch之Java客户端Jest的全部依赖jar包,亲测有效。
Elasticsearch的java客户端使用示例 工程说明: 一. es-hrest-client-src工程: Elasticsearch的高级别的REST客户端使用示例 二、es-java-client-src工程: Elasticsearch的TransportClient客户端使用示例 注意:...
ElasticSearch 官方 java API
Elasticsearch5.5.1 JAVA客户端RestClient示例代码,详细介绍:http://blog.csdn.net/u011781521/article/details/77853571
Elasticsearch5.5.1 JAVA客户端TransportClient相关的示例,包含CRUD,bulk,相关介绍请参考:http://blog.csdn.net/u011781521/article/details/77848489
使用过Elasticsearch RestFul API的都知道,在Java端使用是ES服务需要创建Java Client,但是每一次连接都实例化一个client,对系统的消耗很大,而且最令人头疼的是它的连接非常慢。所以为了解决上述问题并提高client...
本手册不仅详细描述了如何使用Java作为客户端访问ElasticSearch的代码和方法,同时也描述了在使用ElasticSearch遇到的坑和解决方案,是新手入门不可多得的教材,也是老手借鉴提高的材料。
elasticsearch5.5.1客户端所需jar包,可用于普通java工程使用elasticsearch客户端,解决必须用maven工程问题
Elasticsearch客户端常用代码封装实例 基于MyBatis Plus的Java High Level Client ES客户端的常用操作 如:增删改查、聚合查询、客户端封装、节点嗅探等常用代码示例
连接阿里elasticsearch连接客户端代码,elasticsearch5.3.3
相关依赖 org.elasticsearch.client elasticsearch-rest-client 6.5.4 junit junit 4.12 com.fasterxml.jackson.core jackson-databind 2.9.4 org.apache.maven.plugins
通用高性能Elasticsearch highlevel java rest client 客户端,兼容elasticsearch 1.x,2.x,5.x,6.x,7.x,8.x,兼容spring boot 1.x,2.x,开箱即用
java 连接ElasticSearch 客户端Demo
elasticsearch-5.5客户端JAVA开发需要的57个jar包
EsClientRHL是一个可基于springboot的elasticsearch客户端调用封装工具,通过elasticsearch官网推荐的RestHighLevelClient实现,内置了es索引结构工具、es索引数据增删改工具、es查询工具、es数据分析工具或者es。...