1 /*********************************************************************************
2 * Copyright © 2016, highstreet technologies GmbH
5 * http://www.highstreet-technologies.com/
7 * The reproduction, transmission or use of this document or its contents is not
8 * permitted without express written authority. Offenders will be liable for
9 * damages. All rights, including rights created by patent grant or registration
10 * of a utility model or design, are reserved. Technical modifications possible.
11 * Technical specifications and features are binding only insofar as they are
12 * specifically and expressly agreed upon in a written contract.
14 * @author: Herbert Eiselt [herbert.eiselt@highstreet-technologies.com]
15 *********************************************************************************/
16 package org.opendaylight.mwtn.base.database;
18 import java.io.IOException;
19 import java.net.InetAddress;
20 import java.net.UnknownHostException;
21 import java.nio.file.Files;
22 import java.nio.file.Paths;
23 import java.util.List;
26 import javax.annotation.Nullable;
28 import org.elasticsearch.ElasticsearchException;
29 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
30 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
31 import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
32 import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
33 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
34 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
35 import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
36 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
37 import org.elasticsearch.action.delete.DeleteResponse;
38 import org.elasticsearch.action.get.GetResponse;
39 import org.elasticsearch.action.index.IndexRequestBuilder;
40 import org.elasticsearch.action.index.IndexResponse;
41 import org.elasticsearch.action.search.SearchResponse;
42 import org.elasticsearch.client.Client;
43 import org.elasticsearch.client.transport.TransportClient;
44 import org.elasticsearch.cluster.node.DiscoveryNode;
45 import org.elasticsearch.common.bytes.BytesReference;
46 import org.elasticsearch.common.settings.Settings;
47 import org.elasticsearch.common.transport.InetSocketTransportAddress;
48 import org.elasticsearch.common.unit.TimeValue;
49 import org.elasticsearch.index.query.QueryBuilder;
50 import org.elasticsearch.index.query.QueryBuilders;
51 import org.elasticsearch.search.SearchHit;
52 import org.json.JSONObject;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
60 public class HtDatabaseClientAbstract implements HtDataBase, AutoCloseable {
62 private final Logger log = LoggerFactory.getLogger(HtDatabaseClientAbstract.class);
64 private static int DELAYSECONDS = 10;
65 private Client client;
66 private String esIndexAlias;
69 * Full database initialization.
70 * @param esIndex Database index
71 * @param esNodeserverName Servername or Server-IP that hosts the node.
72 * @param esClusterName Name of the cluster
73 * @param esNodeName Name of the node within the cluster to connect to.
74 * @throws UnknownHostException Servername not known.
76 public HtDatabaseClientAbstract(String esIndex, String esNodeserverName, String esClusterName, String esNodeName) throws UnknownHostException {
78 this.esIndexAlias = esIndex;
80 Settings settings = Settings.settingsBuilder()
81 .put("cluster.name", esClusterName)
82 .put("node.name", esNodeName)
84 this.client = getClient(esNodeserverName, settings);
89 * Do not use the hostname for getting the client
91 * @param esClusterName
93 * @throws UnknownHostException
95 public HtDatabaseClientAbstract(String esIndex, String esClusterName, String esNodeName) throws UnknownHostException {
97 this.esIndexAlias = esIndex;
98 Settings settings = Settings.settingsBuilder()
99 .put("cluster.name", esClusterName)
100 .put("node.name", esNodeName)
102 this.client = getClient(null, settings);
107 * Simple database initialization. Query all ES configuration information from cluster node.
108 * @param esIndex Database index
109 * @param esNodeserverHostName Servername or Server-IP that hosts the node.
110 * @throws UnknownHostException Servername not known.
113 public HtDatabaseClientAbstract(String esIndex, String esNodeserverHostName) throws UnknownHostException {
115 this.esIndexAlias = esIndex;
117 Settings settings = Settings.settingsBuilder()
118 .put("client.transport.ignore_cluster_name",true)
119 .put("client.transport.sniff", true)
121 this.client = getClient(esNodeserverHostName, settings);
125 * Simple database initialization. Query all ES configuration information from cluster node.
126 * @param esIndex Database index
127 * @param database databse node descriptor
129 public HtDatabaseClientAbstract(String esIndex, HtDatabaseNode database) {
131 this.esIndexAlias = esIndex;
132 this.client = database.getClient();
136 /*----------------------------------
137 * some constructing functions, used by public constructors
141 * @param esNodeserverName
144 * @throws UnknownHostException
146 private final TransportClient getClient(@Nullable String esNodeserverName, Settings settings) throws UnknownHostException {
148 TransportClient newClient = TransportClient.builder().settings(settings).build();
150 if (esNodeserverName != null) {
151 InetAddress nodeIp = InetAddress.getByName(esNodeserverName);
152 newClient.addTransportAddress(new InetSocketTransportAddress(nodeIp, 9300));
159 private void setup(TransportClient newClient) {
160 NodesInfoResponse nodeInfos = newClient.admin().cluster().prepareNodesInfo().get();
161 String clusterName = nodeInfos.getClusterName().value();
163 // ------ Debug/ Info
164 StringBuffer logInfo = new StringBuffer();
165 logInfo.append("Create ES Client an localhost for Cluster '");
166 logInfo.append(clusterName);
167 logInfo.append("' for index '");
168 logInfo.append(esIndexAlias);
169 logInfo.append("' Nodelist: ");
170 for (DiscoveryNode node : newClient.connectedNodes()) {
172 logInfo.append(node.toString());
173 logInfo.append(") ");
175 log.info(logInfo.toString());
176 // ------ Debug/ Info
178 log.info("Starting Database service. Short wait.");
180 ClusterHealthResponse nodeStatus = newClient.admin().cluster().prepareHealth()
181 .setWaitForGreenStatus()
182 //.setWaitForYellowStatus()
183 .setTimeout(TimeValue.timeValueSeconds(DELAYSECONDS))
185 log.debug("Elasticsearch client started with status {}",nodeStatus.toString());
188 List<DiscoveryNode> nodeList = newClient.connectedNodes();
190 if (nodeList.isEmpty()) {
191 log.info("ES Client created for nodes: <empty node list>");
194 for (DiscoveryNode dn : nodeList) {
195 log.info("ES Client created for node#{}: {}",t , dn.getName());
199 Runtime.getRuntime().addShutdownHook(new Thread(){
200 @Override public void run(){
201 log.info("Shutdown node "+HtDatabaseClientAbstract.class.getSimpleName());
205 log.info("Database service started.");
210 /*----------------------------------
215 public String getNetworkIndex() {
220 public void setNetworkIndex(String es_index) {
221 this.esIndexAlias = es_index;
225 public Client getClient() {
229 /*----------------------------------
236 public void close() {
241 * Create an ES index. Delete an existing index.
243 public void doDeleteIndex() {
244 log.info("Remove index {}", esIndexAlias);
246 if (esIndexAlias == null) {
247 throw new IllegalArgumentException("Missing Index");
253 IndicesExistsResponse res = client.admin().indices().prepareExists(esIndexAlias)
257 if (res.isExists()) {
258 log.info("Delete Index start: {}",esIndexAlias);
259 DeleteIndexRequestBuilder delIdx = client.admin().indices().prepareDelete(esIndexAlias);
260 delIdx.execute().actionGet();
261 log.info("Delete Index done.");
264 } catch (ElasticsearchException e) {
265 log.warn(e.getDetailedMessage());
270 * Verify if index already created
271 * @return boolean accordingly
273 public boolean isExistsIndex() {
275 if (esIndexAlias == null) {
276 throw new IllegalArgumentException("Missing Index");
279 log.debug("Check status of ES index: {}", esIndexAlias);
281 final IndicesExistsResponse indexStatus = client.admin()
283 prepareExists(esIndexAlias).
287 return indexStatus.isExists();
293 * Create and write the mapping and setting of the index
294 * @param jsonString with mapping and setting definition Object or null for no configuration
296 public void doCreateIndexWithMapping(JSONObject jsonIndexMappingSetting) {
298 if (esIndexAlias == null) {
299 throw new IllegalArgumentException("Missing Index");
303 String esIndexName = esIndexAlias+"_v1";
304 log.debug("Create not existing ES index: {} with alias:{}", esIndexName, esIndexAlias);
306 //Create index with mapping
307 CreateIndexRequestBuilder createIndexRequestBuilder = client.admin().indices().prepareCreate(esIndexName);
309 if(jsonIndexMappingSetting!=null) {
310 // Handle optional mappings if requested
311 JSONObject jsonMapping = jsonIndexMappingSetting.optJSONObject("mappings");
312 if (jsonMapping != null) {
313 log.debug("Set mapping for index {} {}", esIndexAlias, jsonMapping);
314 Set<?> keys = jsonMapping.keySet();
315 log.debug("Found length:"+jsonMapping.length()+" keys:"+keys.size());
316 for (Object key : keys) {
317 String docType = (String)key;
318 log.debug("Doctype:{} mapping:{}",docType,jsonMapping.getJSONObject(docType).toString());
319 createIndexRequestBuilder.addMapping(docType, jsonMapping.getJSONObject(docType).toString());
322 log.debug("No mapping requested for index {}", esIndexAlias);
324 // Handle optional settings if requested
325 JSONObject jsonSettings = jsonIndexMappingSetting.optJSONObject("settings");
326 if (jsonSettings != null) {
327 log.debug("Set setting for index {} {}", esIndexAlias, jsonSettings);
328 createIndexRequestBuilder.setSettings(Settings.settingsBuilder().loadFromSource(jsonSettings.toString()));
330 log.debug("No settings requested for index {}", esIndexAlias);
335 CreateIndexResponse createResponse = createIndexRequestBuilder.execute().actionGet();
336 log.debug("CreateIndex response {}",createResponse);
340 log.debug("Set alias {} to index {}",esIndexAlias, esIndexName);
341 IndicesAliasesResponse setAliasResponse = client.admin().indices().prepareAliases().addAlias(esIndexName,esIndexAlias)
342 .execute().actionGet();
343 log.debug("CreateIndex response {}",setAliasResponse);
346 } catch (ElasticsearchException e) {
347 log.warn("ElasticsearchException: {}",e.getDetailedMessage());
352 * Create Index with alias according to definition, but no mapping
354 public void doCreateIndex() {
355 doCreateIndexWithMapping(null);
359 * Write a JSON mapping definition for a document from a file to ES
360 * Hint: A change of the mapping is not possible.
361 * @param documentType Document type in focus
362 * @param jsonString String with mapping definition in JSON Format
365 public void doWriteMappingJson( String jsonString) throws IllegalArgumentException {
367 if (esIndexAlias == null) {
368 throw new IllegalArgumentException("Missing Index");
370 if (jsonString == null) {
371 String s = "Mapping string parameter is null";
373 throw new IllegalArgumentException(s);
378 log.debug("Check status of ES index: {}", esIndexAlias);
380 final IndicesExistsResponse indexStatus = client.admin()
382 prepareExists(esIndexAlias).
386 if (indexStatus.isExists()) {
387 log.debug("ES index exists: {}", esIndexAlias);
388 // TODO: CHANGE Mapping is not working. This here works only for new datatypes
390 PutMappingResponse res= client.admin().indices()
391 .preparePutMapping(esIndexAlias)
392 .setSource(jsonString)
395 log.debug("Result: {}", res.toString());
398 log.debug("Create not existing ES index: {}", esIndexAlias);
400 CreateIndexRequestBuilder createIndexRequestBuilder = client.admin().indices().prepareCreate(esIndexAlias);
401 createIndexRequestBuilder
402 .addMapping(jsonString)
407 } catch (ElasticsearchException e) {
408 log.warn(e.getDetailedMessage());
413 * Write a Json mapping definition for a document from a file to ES
414 * @param fileName Filename with json definition.
416 public void doWriteMappingFromFile(String fileName) {
419 log.info("Write mapping from File: {}", fileName);
421 if (esIndexAlias == null) {
422 throw new IllegalArgumentException("Missing Index");
425 if (fileName == null) {
426 log.warn("No mapping for {} specified in parameter file.", esIndexAlias);
430 String content = null;
433 content = new String(Files.readAllBytes(Paths.get(fileName)),"UTF-8");
434 } catch (IOException e1) {
435 log.warn("Can not read file: {}",e1.getMessage());
438 doWriteMappingJson(content);
443 * Write list with json objects from json files
444 * @param docTypeAndFileName List with 2 String Array.
445 * String[0] Contains the dataType name
446 * String[1] Contains the filename
448 public void doWriteJsonFiles( List<String[]> docTypeAndFileName ) {
450 log.debug("Write JSONFiles: {}", docTypeAndFileName.size());
451 if (docTypeAndFileName != null) {
453 for (String[] s : docTypeAndFileName) {
455 writeJsonObjectsFromFile( s[0], s[1]);
457 log.warn("Wrong parameters number. Entry: {}", t);
465 * Write one object into Database
466 * @param esId Database index
467 * @param dataTypeName Name of datatype
468 * @param json String in JSON format.
469 * @return esId of the object
472 public String doWrite( String dataTypeName, IsEsObject esId, String json) {
473 return doWrite(dataTypeName, esId, json.getBytes());
477 * Write one object into Database
478 * @param esId Database index
479 * @param dataTypeName Name of datatype
480 * @param json String in JSON format.
481 * @return esId of the object
485 public String doWrite( String dataTypeName, IsEsObject esId, byte[] json) {
486 return doWrite(dataTypeName, esId.getEsId(), json);
490 * Write one object into Database
491 * @param dataTypeName
492 * @param id id of the object or null
493 * @param json Object as json
494 * @return esId of the Object
496 public String dowrite(String dataTypeName, String id, JSONObject json) {
497 return doWrite(dataTypeName, id, json.toString().getBytes());
501 * Write one object into Database
502 * @param esId Database index or null
503 * @param dataTypeName Name of datatype
504 * @param json String in JSON format.
505 * @return esId of the object
508 public String doWrite( String dataTypeName, String esId, byte[] json) {
510 if (esIndexAlias == null) {
511 throw new IllegalArgumentException("Missing Index");
514 IndexRequestBuilder request = esId == null || esId.isEmpty() ?
515 client.prepareIndex(esIndexAlias, dataTypeName) :
516 client.prepareIndex(esIndexAlias, dataTypeName, esId);
518 IndexResponse response = null;
520 response = request.setSource(json).execute().actionGet();
521 } catch (ElasticsearchException e) {
522 log.warn("ES Exception {} Json: {}", e.getMessage(), new String(json));
525 if (response == null) {
526 log.warn("Response null during write: {} {}", esId, new String(json));
529 return response.getId();
534 * Write JSON Data. First level contains datatype, next level id
543 public void doWriteJSONObject(JSONObject json) {
545 Set<?> docTypes = json.keySet();
546 log.debug("Found number of keys:"+json.length()+" keys:"+docTypes.size());
547 for (Object docTypeKey : docTypes) {
548 String docType = (String)docTypeKey;
549 JSONObject objects = json.optJSONObject(docType);
550 if (objects == null) {
551 log.debug("Skip json {} with class {}",docType, json.get(docType).getClass());
553 Set<?> ids = objects.keySet();
554 log.debug("write doctype {} with elements {}",docType,ids.size());
555 for (Object idKey : ids) {
556 String id = (String)idKey;
558 JSONObject jsonIdObject = objects.optJSONObject(id);
559 if (jsonIdObject == null) {
560 log.debug("Skip jsonsub {} with class {}",id, objects.get(id).getClass());
562 if (log.isTraceEnabled()) {
563 log.trace("Jsonsub object of id {} '{}'", id, jsonIdObject);
565 this.doWrite(docType, id, jsonIdObject.toString().getBytes());
574 * Remove Object from database
577 public boolean doRemove( String dataTypeName, IsEsObject esId ) {
579 if (esIndexAlias == null) {
580 throw new IllegalArgumentException("Missing Index");
583 DeleteResponse response = client.prepareDelete(esIndexAlias, dataTypeName, esId.getEsId())
587 return response.isFound();
591 * Read Json Object from database
594 public BytesReference doReadJsonData( String dataTypeName, IsEsObject esId ) {
596 log.debug("NetworkIndex: {}",esIndexAlias);
597 if (esId.getEsId() == null) {
598 throw new IllegalArgumentException("Read access to object without database Id");
601 GetResponse response = client.prepareGet(esIndexAlias, dataTypeName, esId.getEsId())
602 //.setOperationThreaded(false)
606 BytesReference json = response.getSourceAsBytesRef();
611 public SearchHit[] doReadByQueryJsonData( int start, int length, String dataTypeName, QueryBuilder qb ) {
613 log.debug("NetworkIndex: {}",esIndexAlias);
615 SearchResponse response1 = client.prepareSearch(esIndexAlias)
616 .setTypes(dataTypeName)
618 .setFrom(start).setSize(length)
619 .execute().actionGet();
621 SearchHit hits[] = response1.getHits().hits();
627 public SearchHit[] doReadAllJsonData( int start, int length, String dataTypeName ) {
629 log.debug("NetworkIndex: {}",esIndexAlias);
632 QueryBuilder qb = QueryBuilders.matchAllQuery();
634 SearchResponse response1 = client.prepareSearch(esIndexAlias)
635 .setTypes(dataTypeName)
637 .setFrom(start).setSize(length)
638 .execute().actionGet();
640 SearchHit hits[] = response1.getHits().hits();
647 * Write Json datetype that is specified by file to ES
648 * @param dataType ES Datatype name
649 * @param fileName file name
651 public void writeJsonObjectsFromFile( String dataType, String fileName ) {
653 log.debug("Start: Index: '{}' ' datatype: '{}' File: '{}'", esIndexAlias, dataType, fileName);
655 String content = null;
658 content = new String( Files.readAllBytes(Paths.get(fileName)), "UTF-8");
659 } catch (IOException e1) {
660 log.warn("Can not read file: {}",e1.getMessage());
663 if (content != null && content.charAt(0) == 0xfeff) {
664 content = content.substring(1);
665 log.debug("Delete first char {} {}", dataType, fileName);
668 if (content != null) {
669 IndexResponse response = null;
671 response = client.prepareIndex(esIndexAlias, dataType)
675 } catch (ElasticsearchException e) {
676 log.error("ElasticsearchException during write: for {} from {}", e.getMessage(), dataType, fileName);
677 } catch (Exception e) {
678 log.error("Exception during write: for {} from {}", e.getMessage(), dataType, fileName);
681 if (response != null) {
682 if (! response.isCreated()) {
683 log.warn("Jackson Response not created: {} {} {}", response.toString(), dataType, fileName);
685 log.debug("Created: {}", response.getId());
688 log.warn("Jackson Response null after write {} {}", dataType, fileName);
695 public void closeDb() {