aee63ed75cccefc97d091644210e01cfec502f11
[dcaegen2/services.git] /
1 /*
2 * ============LICENSE_START=======================================================
3 * ONAP : DATALAKE
4 * ================================================================================
5 * Copyright 2019 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 *     http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
19 */
20
21 package org.onap.datalake.feeder.service.db;
22
23 import java.io.IOException;
24 import java.util.List;
25
26 import javax.annotation.PostConstruct;
27 import javax.annotation.PreDestroy;
28
29 import org.apache.commons.lang3.StringUtils;
30 import org.apache.http.HttpHost;
31 import org.elasticsearch.ElasticsearchException;
32 import org.elasticsearch.action.ActionListener;
33 import org.elasticsearch.action.get.GetRequest;
34 import org.elasticsearch.action.get.GetResponse;
35 import org.elasticsearch.action.index.IndexResponse;
36 import org.elasticsearch.client.indices.CreateIndexRequest;
37 import org.elasticsearch.client.indices.CreateIndexResponse;
38 import org.elasticsearch.client.indices.GetIndexRequest;
39 import org.elasticsearch.action.bulk.BulkRequest;
40 import org.elasticsearch.action.bulk.BulkResponse;
41 import org.elasticsearch.action.index.IndexRequest;
42 import org.elasticsearch.client.RequestOptions;
43 import org.elasticsearch.client.RestClient;
44 import org.elasticsearch.client.RestHighLevelClient;
45 import org.elasticsearch.common.xcontent.XContentType;
46 import org.elasticsearch.rest.RestStatus;
47 import org.json.JSONObject;
48 import org.onap.datalake.feeder.config.ApplicationConfiguration;
49 import org.onap.datalake.feeder.domain.Db;
50 import org.onap.datalake.feeder.domain.EffectiveTopic;
51 import org.onap.datalake.feeder.domain.Topic;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 import org.springframework.beans.factory.annotation.Autowired;
56 import org.springframework.stereotype.Service;
57
58 /**
59  * Elasticsearch Service for table creation, data submission, as well as data pre-processing. 
60  * 
61  * @author Guobiao Mo
62  *
63  */
64 @Service
65 public class ElasticsearchService implements DbStoreService {
66
67         private final Logger log = LoggerFactory.getLogger(this.getClass());
68         
69         private Db elasticsearch;
70
71         @Autowired
72         private ApplicationConfiguration config;
73
74         //@Autowired
75 //      private DbService dbService;
76
77         private RestHighLevelClient client;
78         ActionListener<BulkResponse> listener;
79
80         public ElasticsearchService( ) {
81                 
82         }
83         public ElasticsearchService(Db db) {
84                 elasticsearch = db;
85         }
86         
87         //ES Encrypted communication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html#_encrypted_communication
88         //Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html
89         @PostConstruct
90         private void init() {
91                 //Db elasticsearch = dbService.getElasticsearch();
92                 String elasticsearchHost = elasticsearch.getHost();
93
94                 // Initialize the Connection
95                 client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http")));
96
97                 log.info("Connected to Elasticsearch Host {}", elasticsearchHost);
98
99                 listener = new ActionListener<BulkResponse>() {
100                         @Override
101                         public void onResponse(BulkResponse bulkResponse) {
102                                 if(bulkResponse.hasFailures()) {
103                                         log.debug(bulkResponse.buildFailureMessage());
104                                 }
105                         }
106
107                         @Override
108                         public void onFailure(Exception e) {
109                                 log.error(e.getMessage());
110                         }
111                 };
112         }
113
114         @PreDestroy
115         public void cleanUp() throws IOException {
116                 config.getShutdownLock().readLock().lock();
117
118                 try {
119                         log.info("cleanUp() closing Elasticsearch client.");
120                         client.close();
121                 } catch (IOException e) {
122                         log.error("client.close() at cleanUp.", e);
123                 } finally {
124                         config.getShutdownLock().readLock().unlock();
125                 }
126         }
127
128         public void ensureTableExist(String topic) throws IOException {
129                 String topicLower = topic.toLowerCase();
130
131                 GetIndexRequest request = new GetIndexRequest(topicLower);
132
133                 boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
134                 if (!exists) {
135                         //TODO submit mapping template
136                         CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower);
137                         CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
138                         log.info("{} : created {}", createIndexResponse.index(), createIndexResponse.isAcknowledged());
139                 }
140         }
141
142         //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
143         @Override
144         public void saveJsons(EffectiveTopic effectiveTopic, List<JSONObject> jsons) {
145                 
146                 BulkRequest request = new BulkRequest();
147
148                 for (JSONObject json : jsons) {
149                         if (effectiveTopic.getTopic().isCorrelateClearedMessage()) {
150                                 boolean found = correlateClearedMessage(effectiveTopic.getTopic(), json);
151                                 if (found) {
152                                         continue;
153                                 }
154                         }                       
155                         
156                         String id = effectiveTopic.getTopic().getMessageId(json); //id can be null
157                         
158                         request.add(new IndexRequest(effectiveTopic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON));
159                 }
160
161                 log.debug("saving text to effectiveTopic = {}, batch count = {} ", effectiveTopic, jsons.size());
162
163                 if (config.isAsync()) {
164                         client.bulkAsync(request, RequestOptions.DEFAULT, listener);
165                 } else {
166                         try {
167                                 BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
168                                 if(bulkResponse.hasFailures()) {
169                                         log.debug(bulkResponse.buildFailureMessage());
170                                 }
171                         } catch (IOException e) {
172                                 log.error(effectiveTopic.getName(), e);
173                         }
174                 }
175                 
176         }
177         
178         /**
179          *
180          * @param topic
181          * @param json
182          * @return boolean
183          *
184          *         Because of query by id, The search API cannot be used for query. The
185          *         search API can only query all data or based on the fields in the
186          *         source. So use the get API, three parameters: index, type, document
187          *         id
188          */
189         private boolean correlateClearedMessage(Topic topic, JSONObject json) {
190                 boolean found = false;
191                 String eName = null;
192
193                 try {
194                         eName = json.query("/event/commonEventHeader/eventName").toString();
195
196                         if (StringUtils.isNotBlank(eName) && eName.endsWith("Cleared")) {
197
198                                 String name = eName.substring(0, eName.length() - 7);
199                                 String reportingEntityName = json.query("/event/commonEventHeader/reportingEntityName").toString();
200                                 String specificProblem = json.query("/event/faultFields/specificProblem").toString();
201
202                                 String id = String.join("^", name, reportingEntityName, specificProblem);//example: id = "aaaa^cccc^bbbbb"
203                                 String index = topic.getName().toLowerCase();
204
205                                 //get
206                                 GetRequest getRequest = new GetRequest(index, config.getElasticsearchType(), id);
207
208                                 GetResponse getResponse = null;
209                                 try {
210                                         getResponse = client.get(getRequest, RequestOptions.DEFAULT);
211                                         if (getResponse != null) {
212
213                                                 if (getResponse.isExists()) {
214                                                         String sourceAsString = getResponse.getSourceAsString();
215                                                         JSONObject jsonObject = new JSONObject(sourceAsString);
216                                                         jsonObject.getJSONObject("event").getJSONObject("faultFields").put("vfStatus", "closed");
217                                                         String jsonString = jsonObject.toString();
218
219                                                         //update
220                                                         IndexRequest request = new IndexRequest(index, config.getElasticsearchType(), id);
221                                                         request.source(jsonString, XContentType.JSON);
222                                                         IndexResponse indexResponse = null;
223                                                         try {
224                                                                 indexResponse = client.index(request, RequestOptions.DEFAULT);
225                                                                 found = true;
226                                                         } catch (IOException e) {
227                                                                 log.error("save failure");
228                                                         }
229                                                 } else {
230                                                         log.error("The getResponse was not exists");
231                                                 }
232
233                                         } else {
234                                                 log.error("The document for this id was not found");
235                                         }
236
237                                 } catch (ElasticsearchException e) {
238                                         if (e.status() == RestStatus.NOT_FOUND) {
239                                                 log.error("The document for this id was not found");
240                                         }
241                                         if (e.status() == RestStatus.CONFLICT) {
242                                                 log.error("Version conflict");
243                                         }
244                                         log.error("Get document exception", e);
245                                 } catch (IOException e) {
246                                         log.error(topic.getName(), e);
247                                 }
248
249                         }
250
251                 } catch (Exception e) {
252                         log.error("error", e);
253                 }
254
255                 return found;
256         }
257
258 }