b1fde018a7654df0948d060360d372a1dfef496e
[ccsdk/apps.git] / sdnr / wireless-transport / code-Carbon-SR1 / apps / devicemanager / impl / src / main / java / org / opendaylight / mwtn / base / database / HtDatabaseClientAbstract.java
1 /*********************************************************************************
2  *  Copyright © 2016, highstreet technologies GmbH
3  *  All rights reserved!
4  *
5  *  http://www.highstreet-technologies.com/
6  *
7  *  The reproduction, transmission or use of this document or its contents is not
8  *  permitted without express written authority. Offenders will be liable for
9  *  damages. All rights, including rights created by patent grant or registration
10  *  of a utility model or design, are reserved. Technical modifications possible.
11  *  Technical specifications and features are binding only insofar as they are
12  *  specifically and expressly agreed upon in a written contract.
13  *
14  *  @author: Herbert Eiselt [herbert.eiselt@highstreet-technologies.com]
15  *********************************************************************************/
16 package org.opendaylight.mwtn.base.database;
17
18 import java.io.IOException;
19 import java.net.InetAddress;
20 import java.net.UnknownHostException;
21 import java.nio.file.Files;
22 import java.nio.file.Paths;
23 import java.util.List;
24 import java.util.Set;
25
26 import javax.annotation.Nullable;
27
28 import org.elasticsearch.ElasticsearchException;
29 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
30 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
31 import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
32 import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
33 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
34 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
35 import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
36 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
37 import org.elasticsearch.action.delete.DeleteResponse;
38 import org.elasticsearch.action.get.GetResponse;
39 import org.elasticsearch.action.index.IndexRequestBuilder;
40 import org.elasticsearch.action.index.IndexResponse;
41 import org.elasticsearch.action.search.SearchResponse;
42 import org.elasticsearch.client.Client;
43 import org.elasticsearch.client.transport.TransportClient;
44 import org.elasticsearch.cluster.node.DiscoveryNode;
45 import org.elasticsearch.common.bytes.BytesReference;
46 import org.elasticsearch.common.settings.Settings;
47 import org.elasticsearch.common.transport.InetSocketTransportAddress;
48 import org.elasticsearch.common.unit.TimeValue;
49 import org.elasticsearch.index.query.QueryBuilder;
50 import org.elasticsearch.index.query.QueryBuilders;
51 import org.elasticsearch.search.SearchHit;
52 import org.json.JSONObject;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * @author Herbert
58  *
59  */
60 public class HtDatabaseClientAbstract implements HtDataBase, AutoCloseable {
61
62     private final Logger log = LoggerFactory.getLogger(HtDatabaseClientAbstract.class);
63
64     private static int DELAYSECONDS = 10;
65     private Client client;
66     private String esIndexAlias;
67
68     /**
69      * Full database initialization.
70      * @param esIndex Database index
71      * @param esNodeserverName Servername or Server-IP that hosts the node.
72      * @param esClusterName Name of the cluster
73      * @param esNodeName  Name of the node within the cluster to connect to.
74      * @throws UnknownHostException Servername not known.
75      */
76     public HtDatabaseClientAbstract(String esIndex, String esNodeserverName, String esClusterName, String esNodeName) throws UnknownHostException {
77
78         this.esIndexAlias = esIndex;
79
80         Settings settings = Settings.settingsBuilder()
81                 .put("cluster.name", esClusterName)
82                 .put("node.name", esNodeName)
83                 .build();
84         this.client = getClient(esNodeserverName, settings);
85
86     }
87
88     /**
89      * Do not use the hostname for getting the client
90      * @param esIndex
91      * @param esClusterName
92      * @param esNodeName
93      * @throws UnknownHostException
94      */
95     public HtDatabaseClientAbstract(String esIndex, String esClusterName, String esNodeName) throws UnknownHostException {
96
97         this.esIndexAlias = esIndex;
98         Settings settings = Settings.settingsBuilder()
99                 .put("cluster.name", esClusterName)
100                 .put("node.name", esNodeName)
101                 .build();
102         this.client = getClient(null, settings);
103     }
104
105
106     /**
107      * Simple database initialization. Query all ES configuration information from cluster node.
108      * @param esIndex Database index
109      * @param esNodeserverHostName Servername or Server-IP that hosts the node.
110      * @throws UnknownHostException Servername not known.
111      */
112
113     public HtDatabaseClientAbstract(String esIndex, String esNodeserverHostName) throws UnknownHostException {
114
115         this.esIndexAlias = esIndex;
116
117         Settings settings = Settings.settingsBuilder()
118                 .put("client.transport.ignore_cluster_name",true)
119                 .put("client.transport.sniff", true)
120                 .build();
121         this.client = getClient(esNodeserverHostName, settings);
122     }
123
124     /**
125      * Simple database initialization. Query all ES configuration information from cluster node.
126      * @param esIndex Database index
127      * @param database databse node descriptor
128      */
129     public HtDatabaseClientAbstract(String esIndex, HtDatabaseNode database)  {
130
131         this.esIndexAlias = esIndex;
132         this.client = database.getClient();
133     }
134
135
136     /*----------------------------------
137      * some constructing functions, used by public constructors
138      */
139     /**
140      *
141      * @param esNodeserverName
142      * @param settings
143      * @return
144      * @throws UnknownHostException
145      */
146     private final TransportClient getClient(@Nullable String esNodeserverName, Settings settings) throws UnknownHostException {
147
148         TransportClient newClient = TransportClient.builder().settings(settings).build();
149
150         if (esNodeserverName != null) {
151                 InetAddress nodeIp = InetAddress.getByName(esNodeserverName);
152                         newClient.addTransportAddress(new InetSocketTransportAddress(nodeIp, 9300));
153         }
154
155         setup(newClient);
156         return newClient;
157     }
158
159     private void setup(TransportClient newClient) {
160         NodesInfoResponse nodeInfos = newClient.admin().cluster().prepareNodesInfo().get();
161         String clusterName = nodeInfos.getClusterName().value();
162
163         // ------ Debug/ Info
164         StringBuffer logInfo = new StringBuffer();
165         logInfo.append("Create ES Client an localhost for Cluster '");
166         logInfo.append(clusterName);
167         logInfo.append("' for index '");
168         logInfo.append(esIndexAlias);
169         logInfo.append("' Nodelist: ");
170         for (DiscoveryNode node : newClient.connectedNodes()) {
171             logInfo.append("(");
172             logInfo.append(node.toString());
173             logInfo.append(") ");
174         }
175         log.info(logInfo.toString());
176         // ------ Debug/ Info
177
178         log.info("Starting Database service. Short wait.");
179
180                 ClusterHealthResponse nodeStatus = newClient.admin().cluster().prepareHealth()
181                                 .setWaitForGreenStatus()
182                                 //.setWaitForYellowStatus()
183                         .setTimeout(TimeValue.timeValueSeconds(DELAYSECONDS))
184                         .get();
185                 log.debug("Elasticsearch client started with status {}",nodeStatus.toString());
186
187
188         List<DiscoveryNode> nodeList = newClient.connectedNodes();
189
190         if (nodeList.isEmpty()) {
191             log.info("ES Client created for nodes: <empty node list>");
192         } else {
193             int t=0;
194             for (DiscoveryNode dn : nodeList) {
195                 log.info("ES Client created for node#{}: {}",t , dn.getName());
196             }
197         }
198
199         Runtime.getRuntime().addShutdownHook(new Thread(){
200             @Override public void run(){
201                 log.info("Shutdown node "+HtDatabaseClientAbstract.class.getSimpleName());
202             }
203         });
204
205         log.info("Database service started.");
206
207     }
208
209
210     /*----------------------------------
211      * Getter / Setter
212      */
213
214     @Override
215     public String getNetworkIndex() {
216         return esIndexAlias;
217     }
218
219     @Override
220     public void setNetworkIndex(String es_index) {
221         this.esIndexAlias = es_index;
222     }
223
224     @Override
225     public Client getClient() {
226         return client;
227     }
228
229     /*----------------------------------
230      * Functions
231      */
232
233     /**
234      * Close function
235      */
236     public void close() {
237         client.close();
238     }
239
240     /**
241      * Create an ES index. Delete an existing index.
242      */
243     public void doDeleteIndex() {
244         log.info("Remove index {}", esIndexAlias);
245
246         if (esIndexAlias == null) {
247             throw new IllegalArgumentException("Missing Index");
248         }
249
250         try {
251
252             // Delete index
253             IndicesExistsResponse res = client.admin().indices().prepareExists(esIndexAlias)
254                     .execute()
255                     .actionGet();
256
257             if (res.isExists()) {
258                 log.info("Delete Index start: {}",esIndexAlias);
259                 DeleteIndexRequestBuilder delIdx = client.admin().indices().prepareDelete(esIndexAlias);
260                 delIdx.execute().actionGet();
261                 log.info("Delete Index done.");
262             }
263
264         } catch (ElasticsearchException e) {
265             log.warn(e.getDetailedMessage());
266         }
267     }
268
269     /**
270      * Verify if index already created
271      * @return boolean accordingly
272      */
273     public boolean isExistsIndex() {
274
275         if (esIndexAlias == null) {
276             throw new IllegalArgumentException("Missing Index");
277         }
278
279         log.debug("Check status of ES index: {}", esIndexAlias);
280
281         final IndicesExistsResponse indexStatus = client.admin()
282             .indices().
283             prepareExists(esIndexAlias).
284             execute().
285             actionGet();
286
287         return indexStatus.isExists();
288
289     }
290
291
292     /**
293      * Create and write the mapping and setting of the index
294      * @param jsonString with mapping and setting definition Object or null for no configuration
295      */
296     public void doCreateIndexWithMapping(JSONObject jsonIndexMappingSetting) {
297
298         if (esIndexAlias == null) {
299             throw new IllegalArgumentException("Missing Index");
300         }
301
302         try {
303                 String esIndexName = esIndexAlias+"_v1";
304             log.debug("Create not existing ES index: {} with alias:{}", esIndexName, esIndexAlias);
305
306             //Create index with mapping
307             CreateIndexRequestBuilder createIndexRequestBuilder = client.admin().indices().prepareCreate(esIndexName);
308
309             if(jsonIndexMappingSetting!=null) {
310                 // Handle optional mappings if requested
311                 JSONObject jsonMapping = jsonIndexMappingSetting.optJSONObject("mappings");
312                 if (jsonMapping != null) {
313                         log.debug("Set mapping for index {} {}", esIndexAlias, jsonMapping);
314                         Set<?> keys = jsonMapping.keySet();
315                         log.debug("Found length:"+jsonMapping.length()+" keys:"+keys.size());
316                         for (Object key : keys) {
317                                 String docType = (String)key;
318                                 log.debug("Doctype:{} mapping:{}",docType,jsonMapping.getJSONObject(docType).toString());
319                                 createIndexRequestBuilder.addMapping(docType, jsonMapping.getJSONObject(docType).toString());
320                         }
321                 } else {
322                         log.debug("No mapping requested for index {}", esIndexAlias);
323                 }
324                 // Handle optional settings if requested
325                 JSONObject jsonSettings = jsonIndexMappingSetting.optJSONObject("settings");
326                 if (jsonSettings != null) {
327                         log.debug("Set setting for index {} {}", esIndexAlias, jsonSettings);
328                         createIndexRequestBuilder.setSettings(Settings.settingsBuilder().loadFromSource(jsonSettings.toString()));
329                 } else {
330                         log.debug("No settings requested for index {}", esIndexAlias);
331                 }
332             }
333
334
335             CreateIndexResponse createResponse = createIndexRequestBuilder.execute().actionGet();
336             log.debug("CreateIndex response {}",createResponse);
337
338             {
339             //Set Alias
340             log.debug("Set alias {} to index {}",esIndexAlias, esIndexName);
341             IndicesAliasesResponse setAliasResponse = client.admin().indices().prepareAliases().addAlias(esIndexName,esIndexAlias)
342                         .execute().actionGet();
343             log.debug("CreateIndex response {}",setAliasResponse);
344             }
345
346         } catch (ElasticsearchException e) {
347                 log.warn("ElasticsearchException: {}",e.getDetailedMessage());
348         }
349     }
350
351     /**
352      * Create Index with alias according to definition, but no mapping
353      */
354     public void doCreateIndex() {
355         doCreateIndexWithMapping(null);
356     }
357
358     /**
359      * Write a JSON mapping definition for a document from a file to ES
360      * Hint: A change of the mapping is not possible.
361      * @param documentType Document type in focus
362      * @param jsonString String with mapping definition in JSON Format
363      */
364
365     public void doWriteMappingJson( String jsonString) throws IllegalArgumentException {
366
367         if (esIndexAlias == null) {
368             throw new IllegalArgumentException("Missing Index");
369         }
370         if (jsonString == null) {
371             String s = "Mapping string parameter is null";
372             log.warn(s);
373             throw new IllegalArgumentException(s);
374         }
375
376         try {
377             // MAPPING GOES HERE
378             log.debug("Check status of ES index: {}", esIndexAlias);
379
380             final IndicesExistsResponse indexStatus = client.admin()
381                 .indices().
382                 prepareExists(esIndexAlias).
383                 execute().
384                 actionGet();
385
386             if (indexStatus.isExists()) {
387                 log.debug("ES index exists: {}", esIndexAlias);
388                 // TODO: CHANGE Mapping is not working. This here works only for new datatypes
389
390                 PutMappingResponse res= client.admin().indices()
391                 .preparePutMapping(esIndexAlias)
392                 .setSource(jsonString)
393                 .execute()
394                 .actionGet();
395                 log.debug("Result: {}", res.toString());
396
397             } else {
398                 log.debug("Create not existing ES index: {}", esIndexAlias);
399
400                 CreateIndexRequestBuilder createIndexRequestBuilder = client.admin().indices().prepareCreate(esIndexAlias);
401                 createIndexRequestBuilder
402                 .addMapping(jsonString)
403                 .execute()
404                 .actionGet();
405             }
406
407         } catch (ElasticsearchException e) {
408             log.warn(e.getDetailedMessage());
409         }
410     }
411
412     /**
413      * Write a Json mapping definition for a document from a file to ES
414      * @param fileName Filename with json definition.
415      */
416     public void doWriteMappingFromFile(String fileName) {
417
418
419         log.info("Write mapping from File: {}", fileName);
420
421         if (esIndexAlias == null) {
422             throw new IllegalArgumentException("Missing Index");
423         }
424
425         if (fileName == null) {
426             log.warn("No mapping for {} specified in parameter file.", esIndexAlias);
427             return;
428         }
429
430         String content = null;
431
432         try {
433             content = new String(Files.readAllBytes(Paths.get(fileName)),"UTF-8");
434         } catch (IOException e1) {
435             log.warn("Can not read file: {}",e1.getMessage());
436         }
437
438         doWriteMappingJson(content);
439
440     }
441
442     /**
443      * Write list with json objects from json files
444      * @param docTypeAndFileName List with 2 String Array.
445      *      String[0] Contains the dataType name
446      *      String[1] Contains the filename
447      */
448     public void doWriteJsonFiles( List<String[]> docTypeAndFileName ) {
449
450         log.debug("Write JSONFiles: {}", docTypeAndFileName.size());
451         if (docTypeAndFileName != null) {
452             int t = 1;
453             for (String[] s : docTypeAndFileName) {
454                 if (s.length == 2) {
455                     writeJsonObjectsFromFile( s[0], s[1]);
456                 } else {
457                     log.warn("Wrong parameters number. Entry: {}", t);
458                 }
459                 t++;
460             }
461         }
462     }
463
464     /**
465      * Write one object into Database
466      * @param esId Database index
467      * @param dataTypeName Name of datatype
468      * @param json String in JSON format.
469      * @return esId of the object
470      */
471     @Override
472     public String doWrite( String dataTypeName, IsEsObject esId, String json) {
473         return doWrite(dataTypeName, esId, json.getBytes());
474     }
475
476     /**
477      * Write one object into Database
478      * @param esId Database index
479      * @param dataTypeName Name of datatype
480      * @param json String in JSON format.
481      * @return esId of the object
482      */
483
484     @Override
485     public String doWrite( String dataTypeName, IsEsObject esId, byte[] json) {
486        return doWrite(dataTypeName, esId.getEsId(), json);
487     }
488
489     /**
490      * Write one object into Database
491      * @param dataTypeName
492      * @param id id of the object or null
493      * @param json Object as json
494      * @return esId of the Object
495      */
496     public String dowrite(String dataTypeName, String id, JSONObject json) {
497        return doWrite(dataTypeName, id, json.toString().getBytes());
498     }
499
500     /**
501      * Write one object into Database
502      * @param esId Database index or null
503      * @param dataTypeName Name of datatype
504      * @param json String in JSON format.
505      * @return esId of the object
506      */
507
508     public String doWrite( String dataTypeName, String esId, byte[] json) {
509
510         if (esIndexAlias == null) {
511             throw new IllegalArgumentException("Missing Index");
512         }
513
514         IndexRequestBuilder request = esId == null || esId.isEmpty() ?
515                 client.prepareIndex(esIndexAlias, dataTypeName) :
516                     client.prepareIndex(esIndexAlias, dataTypeName, esId);
517
518                 IndexResponse response = null;
519                 try {
520                     response = request.setSource(json).execute().actionGet();
521                 } catch (ElasticsearchException e) {
522                     log.warn("ES Exception {} Json: {}", e.getMessage(), new String(json));
523                 }
524
525                 if (response == null) {
526                     log.warn("Response null during write: {} {}", esId, new String(json));
527                     return null;
528                 } else {
529                     return response.getId();
530                 }
531     }
532
533     /**
534      * Write JSON Data. First level contains datatype, next level id
535      * Example
536      *      "datatype" : {
537      *           "id" : {
538      *             }
539      *           }
540      *
541      * @param da
542      */
543         public void doWriteJSONObject(JSONObject json) {
544
545                 Set<?> docTypes = json.keySet();
546                 log.debug("Found number of keys:"+json.length()+" keys:"+docTypes.size());
547                 for (Object docTypeKey : docTypes) {
548                         String docType = (String)docTypeKey;
549                         JSONObject objects = json.optJSONObject(docType);
550                         if (objects == null) {
551                                 log.debug("Skip json {} with class {}",docType, json.get(docType).getClass());
552                         } else {
553                                 Set<?> ids = objects.keySet();
554                                 log.debug("write doctype {} with elements {}",docType,ids.size());
555                                 for (Object idKey : ids) {
556                                         String id = (String)idKey;
557
558                                         JSONObject jsonIdObject = objects.optJSONObject(id);
559                                         if (jsonIdObject == null) {
560                                                 log.debug("Skip jsonsub {} with class {}",id, objects.get(id).getClass());
561                                         } else {
562                                                 if (log.isTraceEnabled()) {
563                                                         log.trace("Jsonsub object of id {} '{}'", id, jsonIdObject);
564                                                 }
565                                                 this.doWrite(docType, id, jsonIdObject.toString().getBytes());
566                                         }
567                                 }
568                         }
569                 }
570         }
571
572
573         /**
574      * Remove Object from database
575      */
576     @Override
577     public boolean doRemove( String dataTypeName, IsEsObject esId ) {
578
579         if (esIndexAlias == null) {
580             throw new IllegalArgumentException("Missing Index");
581         }
582
583         DeleteResponse response = client.prepareDelete(esIndexAlias, dataTypeName, esId.getEsId())
584                 .execute()
585                 .actionGet();
586
587         return response.isFound();
588     }
589
590     /**
591      * Read Json Object from database
592      */
593     @Override
594     public BytesReference doReadJsonData( String dataTypeName, IsEsObject esId ) {
595
596         log.debug("NetworkIndex: {}",esIndexAlias);
597         if (esId.getEsId() == null) {
598             throw new IllegalArgumentException("Read access to object without database Id");
599         }
600
601         GetResponse response = client.prepareGet(esIndexAlias, dataTypeName, esId.getEsId())
602                 //.setOperationThreaded(false)
603                 .execute()
604                 .actionGet();
605
606         BytesReference json = response.getSourceAsBytesRef();
607         return json;
608     }
609
610     @Override
611     public SearchHit[] doReadByQueryJsonData( int start, int length, String dataTypeName, QueryBuilder qb ) {
612
613         log.debug("NetworkIndex: {}",esIndexAlias);
614
615         SearchResponse response1 = client.prepareSearch(esIndexAlias)
616                 .setTypes(dataTypeName)
617                 .setQuery( qb )
618                 .setFrom(start).setSize(length)
619                 .execute().actionGet();
620
621         SearchHit hits[] = response1.getHits().hits();
622         return hits;
623     }
624
625
626     @Override
627     public SearchHit[] doReadAllJsonData( int start, int length, String dataTypeName ) {
628
629         log.debug("NetworkIndex: {}",esIndexAlias);
630
631         //Use query
632         QueryBuilder qb = QueryBuilders.matchAllQuery();
633
634         SearchResponse response1 = client.prepareSearch(esIndexAlias)
635                 .setTypes(dataTypeName)
636                 .setQuery( qb )
637                 .setFrom(start).setSize(length)
638                 .execute().actionGet();
639
640         SearchHit hits[] = response1.getHits().hits();
641         return hits;
642     }
643
644
645
646     /**
647      * Write Json datetype that is specified by file to ES
648      * @param dataType    ES Datatype name
649      * @param fileName    file name
650      */
651     public void writeJsonObjectsFromFile( String dataType, String fileName ) {
652
653         log.debug("Start: Index: '{}' ' datatype: '{}'  File: '{}'", esIndexAlias, dataType, fileName);
654
655         String content = null;
656
657         try {
658             content = new String( Files.readAllBytes(Paths.get(fileName)), "UTF-8");
659         } catch (IOException e1) {
660             log.warn("Can not read file: {}",e1.getMessage());
661         }
662
663         if (content != null && content.charAt(0) == 0xfeff) {
664             content = content.substring(1);
665             log.debug("Delete first char {} {}", dataType, fileName);
666         }
667
668         if (content != null) {
669             IndexResponse response = null;
670             try {
671                 response = client.prepareIndex(esIndexAlias, dataType)
672                         .setSource(content)
673                         .execute()
674                         .actionGet();
675             } catch (ElasticsearchException e) {
676                 log.error("ElasticsearchException during write:  for {} from {}", e.getMessage(), dataType, fileName);
677             } catch (Exception e) {
678                 log.error("Exception during write:  for {} from {}", e.getMessage(), dataType, fileName);
679             }
680
681             if (response != null) {
682                 if (! response.isCreated()) {
683                     log.warn("Jackson Response not created: {} {} {}", response.toString(), dataType, fileName);
684                 } else {
685                     log.debug("Created: {}", response.getId());
686                 }
687             } else {
688                 log.warn("Jackson Response null after write {} {}", dataType, fileName);
689             }
690         }
691
692     }
693
694     @Override
695     public void closeDb() {
696     }
697
698 }