2c16b2b867e2d251febae264a99b3b495703684d
[dcaegen2/services.git] /
1 /*
2 * ============LICENSE_START=======================================================
3 * ONAP : DATALAKE
4 * ================================================================================
5 * Copyright 2019 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 *     http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
19 */
20
21 package org.onap.datalake.feeder.service;
22
23 import java.io.IOException;
24 import java.util.List;
25
26 import javax.annotation.PostConstruct;
27 import javax.annotation.PreDestroy;
28
29 import org.apache.http.HttpHost;
30 import org.elasticsearch.action.ActionListener;
31 import org.elasticsearch.client.indices.CreateIndexRequest;
32 import org.elasticsearch.client.indices.CreateIndexResponse;
33 import org.elasticsearch.client.indices.GetIndexRequest; 
34 import org.elasticsearch.action.bulk.BulkRequest;
35 import org.elasticsearch.action.bulk.BulkResponse;
36 import org.elasticsearch.action.index.IndexRequest;
37 import org.elasticsearch.client.RequestOptions;
38 import org.elasticsearch.client.RestClient;
39 import org.elasticsearch.client.RestHighLevelClient;
40 import org.elasticsearch.common.xcontent.XContentType;
41 import org.json.JSONObject;
42 import org.onap.datalake.feeder.config.ApplicationConfiguration;
43 import org.onap.datalake.feeder.domain.Db;
44 import org.onap.datalake.feeder.domain.Topic;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 import org.springframework.beans.factory.annotation.Autowired;
49 import org.springframework.stereotype.Service;
50
51 /**
52  * Service to use Elasticsearch
53  * 
54  * @author Guobiao Mo
55  *
56  */
57 @Service
58 public class ElasticsearchService {
59
60         private final Logger log = LoggerFactory.getLogger(this.getClass());
61
62         @Autowired
63         private ApplicationConfiguration config;
64         
65         @Autowired
66         private DbService dbService;
67
68         private RestHighLevelClient client;
69         ActionListener<BulkResponse> listener;
70
71
72 //ES Encrypted communication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html#_encrypted_communication
73 //Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html
74         @PostConstruct
75         private void init() {
76                 Db elasticsearch = dbService.getElasticsearch();
77                 String elasticsearchHost = elasticsearch.getHost();
78                 
79                 // Initialize the Connection
80                 client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http")));
81
82                 log.info("Connect to Elasticsearch Host {}", elasticsearchHost);
83
84                 listener = new ActionListener<BulkResponse>() {
85                         @Override
86                         public void onResponse(BulkResponse bulkResponse) {
87
88                         }
89
90                         @Override
91                         public void onFailure(Exception e) {
92                                 log.error(e.getMessage());
93                         }
94                 };
95         }
96
97         @PreDestroy
98         public void cleanUp() throws IOException {
99                 client.close();
100         }
101         
102         public void ensureTableExist(String topic) throws IOException {
103                 String topicLower = topic.toLowerCase();
104                 
105                 GetIndexRequest request = new GetIndexRequest(topicLower); 
106                 
107                 boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
108                 if(!exists){
109                         CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower); 
110                         CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);          
111                         log.info("{} : created {}", createIndexResponse.index(), createIndexResponse.isAcknowledged());
112                 }
113         }
114         
115         //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
116         public void saveJsons(Topic topic, List<JSONObject> jsons) {
117                 BulkRequest request = new BulkRequest();
118
119                 for (JSONObject json : jsons) {
120                         if(topic.isCorrelateClearedMessage()) {
121                                 boolean found = correlateClearedMessage(json);
122                                 if(found) {
123                                         continue;
124                                 }
125                         }
126                         
127                         String id = topic.getMessageId(json); //id can be null
128                         
129                         request.add(new IndexRequest(topic.getName().toLowerCase(), "doc", id).source(json.toString(), XContentType.JSON));
130                 }
131                 if(config.isAsync()) {
132                         client.bulkAsync(request, RequestOptions.DEFAULT, listener);                    
133                 }else {
134                         try {
135                                 client.bulk(request, RequestOptions.DEFAULT);
136                         } catch (IOException e) { 
137                                 log.error( topic.getName() , e);
138                         }
139                 }
140         }
141         
142         private boolean correlateClearedMessage(JSONObject json) {
143                 boolean found = true;
144                                 
145                 /*TODO
146                  * 1. check if this is a alarm cleared message
147                  * 2. search previous alarm message
148                  * 3. update previous message, if success, set found=true
149                  */
150                 //for Sonar test, remove the following
151                 if(json.isNull("kkkkk")) {
152                         found = false;
153                 }
154                 
155                 return found; 
156         }
157
158 }